flyrain commented on code in PR #11555:
URL: https://github.com/apache/iceberg/pull/11555#discussion_r1862650256


##########
core/src/main/java/org/apache/iceberg/TableMetadataUtil.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class TableMetadataUtil {

Review Comment:
   Would it make more sense by merging this class to `RewriteTablePathUtil` as 
this util is only used by the new action? 



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestLists;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableMetadataUtil;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("COPY-TABLE", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = versionFile(tableMetadata, versionFileName);
+
+    Preconditions.checkNotNull(
+        versionFile, "Version file %s does not exist in metadata log.", 
versionFile);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private String versionFile(TableMetadata metadata, String versionFileName) {
+    if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
+      return metadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : metadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        return log.file();
+      }
+    }
+    return null;
+  }
+
+  private boolean versionInFilePath(String path, String version) {
+    return fileName(path).equals(version);
+  }
+
+  private String jobDesc() {
+    if (startVersionName != null) {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "up to version '%s'.",
+          sourcePrefix, targetPrefix, table.name(), endVersionName);
+    } else {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "from version '%s' to '%s'.",
+          sourcePrefix, targetPrefix, table.name(), startVersionName, 
endVersionName);
+    }
+  }
+
+  /**
+   *
+   *

Review Comment:
   Minor: remove the empty lines



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry -> {
+                try {
+                  return newDeleteFile(
+                      entry,
+                      io,
+                      spec,
+                      sourcePrefix,
+                      targetPrefix,
+                      stagingLocation,
+                      writer,
+                      positionDeleteReaderWriter);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              })
+          .collect(Collectors.toList());
+    }
+  }
+
+  private static Pair<String, String> newDataFile(

Review Comment:
   minor suggestion:  how about a method name like `writeNewEntry()` or 
`writeEntryWithNewDataFile()`?



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry -> {
+                try {
+                  return newDeleteFile(
+                      entry,
+                      io,
+                      spec,
+                      sourcePrefix,
+                      targetPrefix,
+                      stagingLocation,
+                      writer,
+                      positionDeleteReaderWriter);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              })
+          .collect(Collectors.toList());
+    }
+  }
+
+  private static Pair<String, String> newDataFile(
+      ManifestEntry<DataFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      ManifestWriter<DataFile> writer) {
+    DataFile dataFile = entry.file();
+    String sourceDataFilePath = dataFile.location();
+    Preconditions.checkArgument(
+        sourceDataFilePath.startsWith(sourcePrefix),
+        "Encountered data file %s not under the source prefix %s",
+        sourceDataFilePath,
+        sourcePrefix);
+    String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, 
targetPrefix);
+    DataFile newDataFile =
+        
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
+    appendEntryWithFile(entry, writer, newDataFile);
+    return Pair.of(sourceDataFilePath, newDataFile.location());
+  }
+
+  private static Pair<String, String> newDeleteFile(
+      ManifestEntry<DeleteFile> entry,
+      FileIO io,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      ManifestWriter<DeleteFile> writer,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+
+    DeleteFile file = entry.file();
+
+    switch (file.content()) {
+      case POSITION_DELETES:
+        DeleteFile posDeleteFile =
+            rewritePositionDeleteFile(
+                io, file, spec, sourcePrefix, stagingLocation, targetPrefix, 
posDeleteReaderWriter);
+        String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
+        DeleteFile movedFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .copy(posDeleteFile)
+                .withPath(targetDeleteFilePath)
+                .build();
+        appendEntryWithFile(entry, writer, movedFile);
+        return Pair.of(posDeleteFile.location(), movedFile.location());
+      case EQUALITY_DELETES:
+        DeleteFile eqDeleteFile = newEqualityDeleteFile(file, spec, 
sourcePrefix, targetPrefix);
+        appendEntryWithFile(entry, writer, eqDeleteFile);
+        return Pair.of(file.location(), eqDeleteFile.location());
+      default:
+        throw new UnsupportedOperationException("Unsupported delete file type: 
" + file.content());
+    }
+  }
+
+  private static <F extends ContentFile<F>> void appendEntryWithFile(
+      ManifestEntry<F> entry, ManifestWriter<F> writer, F file) {
+
+    switch (entry.status()) {
+      case ADDED:
+        writer.add(file);
+        break;
+      case EXISTING:
+        writer.existing(
+            file, entry.snapshotId(), entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+      case DELETED:
+        writer.delete(file, entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+    }
+  }
+
+  public interface PositionDeleteReaderWriter {
+    CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, 
PartitionSpec spec);
+
+    PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException;
+  }
+
+  private static DeleteFile rewritePositionDeleteFile(
+      FileIO io,
+      DeleteFile current,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String stagingLocation,
+      String targetPrefix,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+    String path = current.location();
+    if (!path.startsWith(sourcePrefix)) {
+      throw new UnsupportedOperationException(
+          "Expected delete file to be under the source prefix: "
+              + sourcePrefix
+              + " but was "
+              + path);
+    }
+    String newPath = stagingPath(path, stagingLocation);
+
+    OutputFile targetFile = io.newOutputFile(newPath);
+    InputFile sourceFile = io.newInputFile(path);
+
+    try (CloseableIterable<org.apache.iceberg.data.Record> reader =
+        posDeleteReaderWriter.reader(sourceFile, current.format(), spec)) {
+      org.apache.iceberg.data.Record record = null;
+      Schema rowSchema = null;
+      CloseableIterator<org.apache.iceberg.data.Record> recordIt = 
reader.iterator();
+
+      if (recordIt.hasNext()) {
+        record = recordIt.next();
+        rowSchema = record.get(2) != null ? spec.schema() : null;
+      }
+
+      PositionDeleteWriter<Record> writer =
+          posDeleteReaderWriter.writer(
+              targetFile, current.format(), spec, current.partition(), 
rowSchema);

Review Comment:
   In case of no record(`record == null`), we don't need to create a new writer.
   ```
   if(record != null) {
       try (PositionDeleteWriter<Record> writer =
             posDeleteReaderWriter.writer(
                 targetFile, current.format(), spec, current.partition(), 
rowSchema)) {
             ...
       }
   }
   ```
   
   



##########
api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java:
##########
@@ -91,9 +92,10 @@ interface Result {
     String stagingLocation();
 
     /**
-     * Path to a comma-separated list of source and target paths for all files 
added to the table
-     * between startVersion and endVersion, including original data files and 
metadata files
-     * rewritten to staging.
+     * Result file list location. This file contains a 'copy plan', a 
comma-separated list of all

Review Comment:
   we could give an example of the format to make it clear like this:
   ```
   sourcepath/datafile1.parquet targetpath/datafile1.parquet,
   sourcepath/datafile2.parquet targetpath/datafile2.parquet,
   ...
   ```



##########
core/src/main/java/org/apache/iceberg/TableMetadataUtil.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class TableMetadataUtil {
+  private TableMetadataUtil() {}
+
+  public static TableMetadata replacePaths(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    String newLocation = newPath(metadata.location(), sourcePrefix, 
targetPrefix);
+    List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, 
sourcePrefix, targetPrefix);
+    List<MetadataLogEntry> metadataLogEntries =
+        updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix);
+    long snapshotId =
+        metadata.currentSnapshot() == null ? -1 : 
metadata.currentSnapshot().snapshotId();
+    Map<String, String> properties =
+        updateProperties(metadata.properties(), sourcePrefix, targetPrefix);
+
+    return new TableMetadata(
+        null,
+        metadata.formatVersion(),
+        metadata.uuid(),
+        newLocation,
+        metadata.lastSequenceNumber(),
+        metadata.lastUpdatedMillis(),
+        metadata.lastColumnId(),
+        metadata.currentSchemaId(),
+        metadata.schemas(),
+        metadata.defaultSpecId(),
+        metadata.specs(),
+        metadata.lastAssignedPartitionId(),
+        metadata.defaultSortOrderId(),
+        metadata.sortOrders(),
+        properties,
+        snapshotId,
+        newSnapshots,
+        null,
+        metadata.snapshotLog(),
+        metadataLogEntries,
+        metadata.refs(),
+        metadata.statisticsFiles(),
+        metadata.partitionStatisticsFiles(),
+        metadata.changes());
+  }
+
+  private static Map<String, String> updateProperties(
+      Map<String, String> tableProperties, String sourcePrefix, String 
targetPrefix) {
+    Map properties = Maps.newHashMap(tableProperties);

Review Comment:
   minor: `Map` -> `Map<String, String>`



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry -> {
+                try {
+                  return newDeleteFile(
+                      entry,
+                      io,
+                      spec,
+                      sourcePrefix,
+                      targetPrefix,
+                      stagingLocation,
+                      writer,
+                      positionDeleteReaderWriter);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              })
+          .collect(Collectors.toList());
+    }
+  }
+
+  private static Pair<String, String> newDataFile(
+      ManifestEntry<DataFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      ManifestWriter<DataFile> writer) {
+    DataFile dataFile = entry.file();
+    String sourceDataFilePath = dataFile.location();
+    Preconditions.checkArgument(
+        sourceDataFilePath.startsWith(sourcePrefix),
+        "Encountered data file %s not under the source prefix %s",
+        sourceDataFilePath,
+        sourcePrefix);
+    String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, 
targetPrefix);
+    DataFile newDataFile =
+        
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
+    appendEntryWithFile(entry, writer, newDataFile);
+    return Pair.of(sourceDataFilePath, newDataFile.location());
+  }
+
+  private static Pair<String, String> newDeleteFile(
+      ManifestEntry<DeleteFile> entry,
+      FileIO io,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      ManifestWriter<DeleteFile> writer,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+
+    DeleteFile file = entry.file();
+
+    switch (file.content()) {
+      case POSITION_DELETES:
+        DeleteFile posDeleteFile =
+            rewritePositionDeleteFile(
+                io, file, spec, sourcePrefix, stagingLocation, targetPrefix, 
posDeleteReaderWriter);
+        String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
+        DeleteFile movedFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .copy(posDeleteFile)
+                .withPath(targetDeleteFilePath)
+                .build();
+        appendEntryWithFile(entry, writer, movedFile);
+        return Pair.of(posDeleteFile.location(), movedFile.location());
+      case EQUALITY_DELETES:
+        DeleteFile eqDeleteFile = newEqualityDeleteFile(file, spec, 
sourcePrefix, targetPrefix);
+        appendEntryWithFile(entry, writer, eqDeleteFile);
+        return Pair.of(file.location(), eqDeleteFile.location());
+      default:
+        throw new UnsupportedOperationException("Unsupported delete file type: 
" + file.content());
+    }
+  }
+
+  private static <F extends ContentFile<F>> void appendEntryWithFile(
+      ManifestEntry<F> entry, ManifestWriter<F> writer, F file) {
+
+    switch (entry.status()) {
+      case ADDED:
+        writer.add(file);
+        break;
+      case EXISTING:
+        writer.existing(
+            file, entry.snapshotId(), entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+      case DELETED:
+        writer.delete(file, entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+    }
+  }
+
+  public interface PositionDeleteReaderWriter {
+    CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, 
PartitionSpec spec);
+
+    PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException;
+  }
+
+  private static DeleteFile rewritePositionDeleteFile(
+      FileIO io,
+      DeleteFile current,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String stagingLocation,
+      String targetPrefix,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+    String path = current.location();
+    if (!path.startsWith(sourcePrefix)) {
+      throw new UnsupportedOperationException(
+          "Expected delete file to be under the source prefix: "
+              + sourcePrefix
+              + " but was "
+              + path);
+    }
+    String newPath = stagingPath(path, stagingLocation);
+
+    OutputFile targetFile = io.newOutputFile(newPath);
+    InputFile sourceFile = io.newInputFile(path);
+
+    try (CloseableIterable<org.apache.iceberg.data.Record> reader =

Review Comment:
   org.apache.iceberg.data.Record -> Record?



##########
core/src/main/java/org/apache/iceberg/ManifestLists.java:
##########
@@ -50,7 +50,7 @@ static List<ManifestFile> read(InputFile manifestList) {
     }
   }
 
-  static ManifestListWriter write(
+  public static ManifestListWriter write(

Review Comment:
   Minor question: should we make the class `ManifestListWriter` public as well 
due to this change?



##########
core/src/main/java/org/apache/iceberg/TableMetadataUtil.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class TableMetadataUtil {
+  private TableMetadataUtil() {}
+
+  public static TableMetadata replacePaths(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    String newLocation = newPath(metadata.location(), sourcePrefix, 
targetPrefix);
+    List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, 
sourcePrefix, targetPrefix);
+    List<MetadataLogEntry> metadataLogEntries =
+        updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix);
+    long snapshotId =
+        metadata.currentSnapshot() == null ? -1 : 
metadata.currentSnapshot().snapshotId();
+    Map<String, String> properties =
+        updateProperties(metadata.properties(), sourcePrefix, targetPrefix);
+
+    return new TableMetadata(
+        null,
+        metadata.formatVersion(),
+        metadata.uuid(),
+        newLocation,
+        metadata.lastSequenceNumber(),
+        metadata.lastUpdatedMillis(),
+        metadata.lastColumnId(),
+        metadata.currentSchemaId(),
+        metadata.schemas(),
+        metadata.defaultSpecId(),
+        metadata.specs(),
+        metadata.lastAssignedPartitionId(),
+        metadata.defaultSortOrderId(),
+        metadata.sortOrders(),
+        properties,
+        snapshotId,
+        newSnapshots,
+        null,
+        metadata.snapshotLog(),
+        metadataLogEntries,
+        metadata.refs(),
+        metadata.statisticsFiles(),
+        metadata.partitionStatisticsFiles(),

Review Comment:
   We will need to rewrite statistic file path as well, but I'm OK to support 
it in a follow-up PR.



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry -> {
+                try {
+                  return newDeleteFile(
+                      entry,
+                      io,
+                      spec,
+                      sourcePrefix,
+                      targetPrefix,
+                      stagingLocation,
+                      writer,
+                      positionDeleteReaderWriter);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              })
+          .collect(Collectors.toList());
+    }
+  }
+
+  private static Pair<String, String> newDataFile(
+      ManifestEntry<DataFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      ManifestWriter<DataFile> writer) {
+    DataFile dataFile = entry.file();
+    String sourceDataFilePath = dataFile.location();
+    Preconditions.checkArgument(
+        sourceDataFilePath.startsWith(sourcePrefix),
+        "Encountered data file %s not under the source prefix %s",
+        sourceDataFilePath,
+        sourcePrefix);
+    String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, 
targetPrefix);
+    DataFile newDataFile =
+        
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
+    appendEntryWithFile(entry, writer, newDataFile);
+    return Pair.of(sourceDataFilePath, newDataFile.location());
+  }
+
+  private static Pair<String, String> newDeleteFile(
+      ManifestEntry<DeleteFile> entry,
+      FileIO io,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      ManifestWriter<DeleteFile> writer,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+
+    DeleteFile file = entry.file();
+
+    switch (file.content()) {
+      case POSITION_DELETES:
+        DeleteFile posDeleteFile =
+            rewritePositionDeleteFile(
+                io, file, spec, sourcePrefix, stagingLocation, targetPrefix, 
posDeleteReaderWriter);
+        String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
+        DeleteFile movedFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .copy(posDeleteFile)
+                .withPath(targetDeleteFilePath)
+                .build();
+        appendEntryWithFile(entry, writer, movedFile);
+        return Pair.of(posDeleteFile.location(), movedFile.location());
+      case EQUALITY_DELETES:
+        DeleteFile eqDeleteFile = newEqualityDeleteFile(file, spec, 
sourcePrefix, targetPrefix);
+        appendEntryWithFile(entry, writer, eqDeleteFile);
+        return Pair.of(file.location(), eqDeleteFile.location());
+      default:
+        throw new UnsupportedOperationException("Unsupported delete file type: 
" + file.content());
+    }
+  }
+
+  private static <F extends ContentFile<F>> void appendEntryWithFile(
+      ManifestEntry<F> entry, ManifestWriter<F> writer, F file) {
+
+    switch (entry.status()) {
+      case ADDED:
+        writer.add(file);
+        break;
+      case EXISTING:
+        writer.existing(
+            file, entry.snapshotId(), entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+      case DELETED:
+        writer.delete(file, entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+    }
+  }
+
+  public interface PositionDeleteReaderWriter {
+    CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, 
PartitionSpec spec);
+
+    PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException;
+  }
+
+  private static DeleteFile rewritePositionDeleteFile(
+      FileIO io,
+      DeleteFile current,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String stagingLocation,
+      String targetPrefix,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+    String path = current.location();
+    if (!path.startsWith(sourcePrefix)) {
+      throw new UnsupportedOperationException(
+          "Expected delete file to be under the source prefix: "
+              + sourcePrefix
+              + " but was "
+              + path);
+    }
+    String newPath = stagingPath(path, stagingLocation);
+
+    OutputFile targetFile = io.newOutputFile(newPath);
+    InputFile sourceFile = io.newInputFile(path);
+
+    try (CloseableIterable<org.apache.iceberg.data.Record> reader =
+        posDeleteReaderWriter.reader(sourceFile, current.format(), spec)) {
+      org.apache.iceberg.data.Record record = null;

Review Comment:
   org.apache.iceberg.data.Record -> Record?



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry -> {
+                try {
+                  return newDeleteFile(
+                      entry,
+                      io,
+                      spec,
+                      sourcePrefix,
+                      targetPrefix,
+                      stagingLocation,
+                      writer,
+                      positionDeleteReaderWriter);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   Minor suggestion: Wrap the IOException within the method `newDeleteFile()` 
like this?
   ```
           try {
             DeleteFile posDeleteFile =
                     rewritePositionDeleteFile(
                             io, file, spec, sourcePrefix, stagingLocation, 
targetPrefix, posDeleteReaderWriter);
   
           String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
           DeleteFile movedFile =
               FileMetadata.deleteFileBuilder(spec)
                   .copy(posDeleteFile)
                   .withPath(targetDeleteFilePath)
                   .build();
           appendEntryWithFile(entry, writer, movedFile);
           return Pair.of(posDeleteFile.location(), movedFile.location());
           } catch (IOException e) {
             throw new RuntimeException("failed to rewrite pos delete file ", 
e);
           }
   ```



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry -> {
+                try {
+                  return newDeleteFile(
+                      entry,
+                      io,
+                      spec,
+                      sourcePrefix,
+                      targetPrefix,
+                      stagingLocation,
+                      writer,
+                      positionDeleteReaderWriter);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              })
+          .collect(Collectors.toList());
+    }
+  }
+
+  private static Pair<String, String> newDataFile(
+      ManifestEntry<DataFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      ManifestWriter<DataFile> writer) {
+    DataFile dataFile = entry.file();
+    String sourceDataFilePath = dataFile.location();
+    Preconditions.checkArgument(
+        sourceDataFilePath.startsWith(sourcePrefix),
+        "Encountered data file %s not under the source prefix %s",
+        sourceDataFilePath,
+        sourcePrefix);
+    String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, 
targetPrefix);
+    DataFile newDataFile =
+        
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
+    appendEntryWithFile(entry, writer, newDataFile);
+    return Pair.of(sourceDataFilePath, newDataFile.location());
+  }
+
+  private static Pair<String, String> newDeleteFile(
+      ManifestEntry<DeleteFile> entry,
+      FileIO io,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      ManifestWriter<DeleteFile> writer,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+
+    DeleteFile file = entry.file();
+
+    switch (file.content()) {
+      case POSITION_DELETES:
+        DeleteFile posDeleteFile =
+            rewritePositionDeleteFile(
+                io, file, spec, sourcePrefix, stagingLocation, targetPrefix, 
posDeleteReaderWriter);
+        String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
+        DeleteFile movedFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .copy(posDeleteFile)
+                .withPath(targetDeleteFilePath)
+                .build();
+        appendEntryWithFile(entry, writer, movedFile);
+        return Pair.of(posDeleteFile.location(), movedFile.location());
+      case EQUALITY_DELETES:
+        DeleteFile eqDeleteFile = newEqualityDeleteFile(file, spec, 
sourcePrefix, targetPrefix);
+        appendEntryWithFile(entry, writer, eqDeleteFile);
+        return Pair.of(file.location(), eqDeleteFile.location());
+      default:
+        throw new UnsupportedOperationException("Unsupported delete file type: 
" + file.content());
+    }
+  }
+
+  private static <F extends ContentFile<F>> void appendEntryWithFile(
+      ManifestEntry<F> entry, ManifestWriter<F> writer, F file) {
+
+    switch (entry.status()) {
+      case ADDED:
+        writer.add(file);
+        break;
+      case EXISTING:
+        writer.existing(
+            file, entry.snapshotId(), entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+      case DELETED:
+        writer.delete(file, entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+    }
+  }
+
+  public interface PositionDeleteReaderWriter {
+    CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, 
PartitionSpec spec);
+
+    PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException;
+  }
+
+  private static DeleteFile rewritePositionDeleteFile(
+      FileIO io,
+      DeleteFile current,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String stagingLocation,
+      String targetPrefix,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+    String path = current.location();
+    if (!path.startsWith(sourcePrefix)) {
+      throw new UnsupportedOperationException(
+          "Expected delete file to be under the source prefix: "
+              + sourcePrefix
+              + " but was "
+              + path);
+    }
+    String newPath = stagingPath(path, stagingLocation);
+
+    OutputFile targetFile = io.newOutputFile(newPath);
+    InputFile sourceFile = io.newInputFile(path);
+
+    try (CloseableIterable<org.apache.iceberg.data.Record> reader =
+        posDeleteReaderWriter.reader(sourceFile, current.format(), spec)) {
+      org.apache.iceberg.data.Record record = null;
+      Schema rowSchema = null;
+      CloseableIterator<org.apache.iceberg.data.Record> recordIt = 
reader.iterator();
+
+      if (recordIt.hasNext()) {
+        record = recordIt.next();
+        rowSchema = record.get(2) != null ? spec.schema() : null;
+      }
+
+      PositionDeleteWriter<Record> writer =
+          posDeleteReaderWriter.writer(
+              targetFile, current.format(), spec, current.partition(), 
rowSchema);
+
+      try (writer) {
+        if (record != null) {
+          writer.write(newPositionDeleteRecord(record, sourcePrefix, 
targetPrefix));
+        }
+
+        while (recordIt.hasNext()) {
+          record = recordIt.next();
+          writer.write(newPositionDeleteRecord(record, sourcePrefix, 
targetPrefix));

Review Comment:
   Is it possible record is null here?



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+
+public class RewriteTablePathUtil {
+
+  private RewriteTablePathUtil() {}
+
+  public static List<Pair<String, String>> rewriteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> newDataFile(entry, spec, sourcePrefix, targetPrefix, 
writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static List<Pair<String, String>> rewriteDeleteManifest(
+      FileIO io,
+      int format,
+      PartitionSpec spec,
+      OutputFile outputFile,
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      PositionDeleteReaderWriter positionDeleteReaderWriter)
+      throws IOException {
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)

Review Comment:
   Should we use Spark to parallelize each delete file? The perf could be bad 
in case there are multiple delete files in a manifest file. The current 
approach is only parallelize with each manifest file. Specifically, each Spark 
task handles one manifest file.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestLists;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableMetadataUtil;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("COPY-TABLE", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = versionFile(tableMetadata, versionFileName);
+
+    Preconditions.checkNotNull(
+        versionFile, "Version file %s does not exist in metadata log.", 
versionFile);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private String versionFile(TableMetadata metadata, String versionFileName) {
+    if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
+      return metadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : metadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        return log.file();
+      }
+    }
+    return null;
+  }
+
+  private boolean versionInFilePath(String path, String version) {
+    return fileName(path).equals(version);
+  }
+
+  private String jobDesc() {
+    if (startVersionName != null) {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "up to version '%s'.",
+          sourcePrefix, targetPrefix, table.name(), endVersionName);
+    } else {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "from version '%s' to '%s'.",
+          sourcePrefix, targetPrefix, table.name(), startVersionName, 
endVersionName);
+    }
+  }
+
+  /**
+   *
+   *
+   * <ul>
+   *   <li>Rebuild version files to staging
+   *   <li>Rebuild manifest list files to staging
+   *   <li>Rebuild manifest to staging
+   *   <li>Get all files needed to move
+   * </ul>
+   */
+  private String rebuildMetadata() {
+    TableMetadata startMetadata =
+        startVersionName != null
+            ? ((HasTableOperations) newStaticTable(startVersionName, 
table.io()))
+                .operations()
+                .current()
+            : null;
+    TableMetadata endMetadata =
+        ((HasTableOperations) newStaticTable(endVersionName, 
table.io())).operations().current();
+
+    Preconditions.checkArgument(
+        endMetadata.statisticsFiles() == null || 
endMetadata.statisticsFiles().isEmpty(),
+        "Statistic files are not supported yet.");
+
+    // rebuild version files
+    RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
+    Set<Snapshot> diffSnapshots =
+        getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite());
+
+    Set<String> manifestsToRewrite = manifestsToRewrite(diffSnapshots, 
startMetadata);
+    Set<Snapshot> validSnapshots =
+        Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+
+    // rebuild manifest-list files
+    RewriteResult<ManifestFile> rewriteManifestListResult =
+        validSnapshots.stream()
+            .map(snapshot -> rewriteManifestList(snapshot, endMetadata, 
manifestsToRewrite))
+            .reduce(new RewriteResult<>(), RewriteResult::new);
+
+    // rebuild manifest files
+    Set<Pair<String, String>> contentFilesToMove =
+        rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite());
+
+    Set<Pair<String, String>> movePlan = Sets.newHashSet();
+    movePlan.addAll(rewriteVersionResult.copyPlan());
+    movePlan.addAll(rewriteManifestListResult.copyPlan());
+    movePlan.addAll(contentFilesToMove);
+
+    return saveFileList(movePlan);
+  }
+
+  private String saveFileList(Set<Pair<String, String>> filesToMove) {
+    List<Tuple2<String, String>> fileList =
+        filesToMove.stream()
+            .map(p -> Tuple2.apply(p.first(), p.second()))
+            .collect(Collectors.toList());
+    Dataset<Tuple2<String, String>> fileListDataset =
+        spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()));
+    String fileListPath = stagingDir + RESULT_LOCATION;
+    fileListDataset
+        .repartition(1)
+        .write()
+        .mode(SaveMode.Overwrite)
+        .format("csv")
+        .save(fileListPath);
+    return fileListPath;
+  }
+
+  private Set<Snapshot> getDiffSnapshotIds(
+      TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
+    if (startMetadata == null) {
+      return allSnapshots;
+    } else {
+      Set<Long> startSnapshotIds =
+          
startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+      return allSnapshots.stream()
+          .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+          .collect(Collectors.toSet());
+    }
+  }
+
+  private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata 
endMetadata) {
+    RewriteResult<Snapshot> result = new RewriteResult<>();
+    result.toRewrite().addAll(endMetadata.snapshots());
+    result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
+
+    List<MetadataLogEntry> versions = endMetadata.previousFiles();
+    for (int i = versions.size() - 1; i >= 0; i--) {
+      String versionFilePath = versions.get(i).file();
+      if (versionFilePath.equals(startVersionName)) {
+        break;
+      }
+
+      Preconditions.checkArgument(
+          fileExist(versionFilePath),
+          String.format("Version file %s doesn't exist", versionFilePath));
+      TableMetadata tableMetadata =
+          new StaticTableOperations(versionFilePath, table.io()).current();
+
+      result.toRewrite().addAll(tableMetadata.snapshots());
+      result.copyPlan().add(rewriteVersionFile(tableMetadata, 
versionFilePath));
+    }
+
+    return result;
+  }
+
+  private Pair<String, String> rewriteVersionFile(TableMetadata metadata, 
String versionFilePath) {
+    String stagingPath = stagingPath(versionFilePath, stagingDir);
+    TableMetadata newTableMetadata =
+        TableMetadataUtil.replacePaths(metadata, sourcePrefix, targetPrefix);
+    TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
+    return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, 
targetPrefix));
+  }
+
+  /**
+   * Rewrite a manifest list representing a snapshot.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param tableMetadata metadata of table
+   */
+  private RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+    List<ManifestFile> manifestFiles = manifestFilesInSnapshot(snapshot);
+    String path = snapshot.manifestListLocation();
+    String stagingPath = stagingPath(path, stagingDir);
+    OutputFile outputFile = table.io().newOutputFile(stagingPath);
+    try (FileAppender<ManifestFile> writer =
+        ManifestLists.write(
+            tableMetadata.formatVersion(),
+            outputFile,
+            snapshot.snapshotId(),
+            snapshot.parentId(),
+            snapshot.sequenceNumber())) {
+
+      for (ManifestFile file : manifestFiles) {
+        Preconditions.checkArgument(
+            file.path().startsWith(sourcePrefix),
+            "Encountered manifest file %s not under the source prefix %s",
+            file.path(),
+            sourcePrefix);
+
+        ManifestFile newFile = file.copy();
+        ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, 
targetPrefix));
+        writer.add(newFile);
+
+        // return the ManifestFile object for subsequent rewriting
+        if (manifestsToRewrite.contains(file.path())) {
+          result.toRewrite().add(file);
+          result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), 
newFile.path()));
+        }
+      }
+
+      result.copyPlan().add(Pair.of(stagingPath, newPath(path, sourcePrefix, 
targetPrefix)));
+      return result;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to rewrite the manifest list file 
" + path, e);
+    }
+  }
+
+  private List<ManifestFile> manifestFilesInSnapshot(Snapshot snapshot) {
+    String path = snapshot.manifestListLocation();
+    List<ManifestFile> manifestFiles = Lists.newLinkedList();
+    try {
+      manifestFiles = ManifestLists.read(table.io().newInputFile(path));
+    } catch (RuntimeIOException e) {
+      LOG.warn("Failed to read manifest list {}", path, e);
+    }
+    return manifestFiles;
+  }
+
+  private Set<String> manifestsToRewrite(Set<Snapshot> diffSnapshots, 
TableMetadata startMetadata) {
+    try {
+      Table endStaticTable = newStaticTable(endVersionName, table.io());
+      Dataset<Row> lastVersionFiles = 
manifestDS(endStaticTable).select("path");
+      if (startMetadata == null) {
+        return 
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+      } else {
+        Set<Long> diffSnapshotIds =
+            
diffSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+        return Sets.newHashSet(
+            lastVersionFiles
+                .distinct()
+                
.filter(functions.column("added_snapshot_id").isInCollection(diffSnapshotIds))
+                .as(Encoders.STRING())
+                .collectAsList());
+      }
+    } catch (Exception e) {
+      throw new UnsupportedOperationException(
+          "Failed to build the manifest files dataframe, the end version you 
are "
+              + "trying to copy may contain invalid snapshots, please a 
younger version that doesn't have invalid "
+              + "snapshots",
+          e);
+    }
+  }
+
+  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
+  private Set<Pair<String, String>> rewriteManifests(
+      TableMetadata tableMetadata, Set<ManifestFile> toRewrite) {
+    if (toRewrite.isEmpty()) {
+      return Sets.newHashSet();
+    }
+
+    Encoder<ManifestFile> manifestFileEncoder = 
Encoders.javaSerialization(ManifestFile.class);
+    Dataset<ManifestFile> manifestDS =
+        spark().createDataset(Lists.newArrayList(toRewrite), 
manifestFileEncoder);
+
+    Broadcast<Table> serializableTable = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+    Broadcast<Map<Integer, PartitionSpec>> specsById =
+        sparkContext().broadcast(tableMetadata.specsById());
+
+    List<Tuple2<String, String>> dataFiles =
+        manifestDS
+            .repartition(toRewrite.size())
+            .mapPartitions(
+                toManifests(
+                    serializableTable,
+                    stagingDir,
+                    tableMetadata.formatVersion(),
+                    specsById,
+                    sourcePrefix,
+                    targetPrefix),
+                Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
+            .collectAsList();
+
+    // duplicates are expected here as the same data file can have different 
statuses
+    // (e.g. added and deleted)
+    return dataFiles.stream().map(t -> Pair.of(t._1(), 
t._2())).collect(Collectors.toSet());
+  }
+
+  private static MapPartitionsFunction<ManifestFile, Tuple2<String, String>> 
toManifests(
+      Broadcast<Table> tableBroadcast,
+      String stagingLocation,
+      int format,
+      Broadcast<Map<Integer, PartitionSpec>> specsById,
+      String sourcePrefix,
+      String targetPrefix) {
+
+    return rows -> {
+      List<Tuple2<String, String>> files = Lists.newArrayList();
+      while (rows.hasNext()) {
+        ManifestFile manifestFile = rows.next();
+        switch (manifestFile.content()) {
+          case DATA:
+            files.addAll(
+                writeDataManifest(
+                    manifestFile,
+                    tableBroadcast,
+                    stagingLocation,
+                    format,
+                    specsById,
+                    sourcePrefix,
+                    targetPrefix));
+            break;
+          case DELETES:
+            files.addAll(
+                writeDeleteManifest(
+                    manifestFile,
+                    tableBroadcast,
+                    stagingLocation,
+                    format,
+                    specsById,
+                    sourcePrefix,
+                    targetPrefix));
+            break;
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported manifest type: " + manifestFile.content());
+        }
+      }
+      return files.iterator();
+    };
+  }
+
+  private static List<Tuple2<String, String>> writeDataManifest(
+      ManifestFile manifestFile,
+      Broadcast<Table> tableBroadcast,
+      String stagingLocation,
+      int format,
+      Broadcast<Map<Integer, PartitionSpec>> specsByIdBroadcast,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    String stagingPath = stagingPath(manifestFile.path(), stagingLocation);
+    FileIO io = tableBroadcast.getValue().io();
+    OutputFile outputFile = io.newOutputFile(stagingPath);
+    Map<Integer, PartitionSpec> specsById = specsByIdBroadcast.getValue();
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+
+    return RewriteTablePathUtil.rewriteManifest(
+            io, format, spec, outputFile, manifestFile, specsById, 
sourcePrefix, targetPrefix)
+        .stream()
+        .map(p -> Tuple2.apply(p.first(), p.second()))
+        .collect(Collectors.toList());
+  }
+
+  private static List<Tuple2<String, String>> writeDeleteManifest(
+      ManifestFile manifestFile,
+      Broadcast<Table> tableBroadcast,
+      String stagingLocation,
+      int format,
+      Broadcast<Map<Integer, PartitionSpec>> specsByIdBroadcast,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    String stagingPath = stagingPath(manifestFile.path(), stagingLocation);
+    FileIO io = tableBroadcast.getValue().io();
+    OutputFile outputFile = io.newOutputFile(stagingPath);
+    Map<Integer, PartitionSpec> specsById = specsByIdBroadcast.getValue();
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter =
+        new RewriteTablePathUtil.PositionDeleteReaderWriter() {
+          @Override
+          public CloseableIterable<Record> reader(
+              InputFile inputFile, FileFormat format, PartitionSpec spec) {
+            return positionDeletesReader(inputFile, format, spec);
+          }
+
+          @Override
+          public PositionDeleteWriter<Record> writer(
+              OutputFile outputFile,
+              FileFormat format,
+              PartitionSpec spec,
+              StructLike partition,
+              Schema rowSchema)
+              throws IOException {
+            return positionDeletesWriter(outputFile, format, spec, partition, 
rowSchema);
+          }
+        };
+    return RewriteTablePathUtil.rewriteDeleteManifest(
+            io,
+            format,
+            spec,
+            outputFile,
+            manifestFile,
+            specsById,
+            sourcePrefix,
+            targetPrefix,
+            stagingLocation,
+            posDeleteReaderWriter)
+        .stream()
+        .map(p -> Tuple2.apply(p.first(), p.second()))
+        .collect(Collectors.toList());
+  }
+
+  private static CloseableIterable<Record> positionDeletesReader(
+      InputFile inputFile, FileFormat format, PartitionSpec spec) {
+    Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(spec.schema());
+    switch (format) {
+      case AVRO:
+        return Avro.read(inputFile)
+            .project(deleteSchema)
+            .reuseContainers()
+            .createReaderFunc(DataReader::create)
+            .build();
+
+      case PARQUET:
+        return Parquet.read(inputFile)
+            .project(deleteSchema)
+            .reuseContainers()
+            .createReaderFunc(
+                fileSchema -> GenericParquetReaders.buildReader(deleteSchema, 
fileSchema))
+            .build();
+
+      case ORC:
+        return ORC.read(inputFile)
+            .project(deleteSchema)
+            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(deleteSchema, fileSchema))
+            .build();
+
+      default:
+        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    }
+  }
+
+  private static PositionDeleteWriter<Record> positionDeletesWriter(
+      OutputFile outputFile,
+      FileFormat format,
+      PartitionSpec spec,
+      StructLike partition,
+      Schema rowSchema)
+      throws IOException {
+    switch (format) {
+      case AVRO:
+        return Avro.writeDeletes(outputFile)
+            .createWriterFunc(DataWriter::create)
+            .withPartition(partition)
+            .rowSchema(rowSchema)
+            .withSpec(spec)
+            .buildPositionWriter();
+      case PARQUET:
+        return Parquet.writeDeletes(outputFile)
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .withPartition(partition)
+            .rowSchema(rowSchema)
+            .withSpec(spec)
+            .buildPositionWriter();
+      case ORC:
+        return ORC.writeDeletes(outputFile)
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .withPartition(partition)
+            .rowSchema(rowSchema)
+            .withSpec(spec)
+            .buildPositionWriter();
+      default:
+        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    }
+  }
+
+  private Set<Snapshot> snapshotSet(TableMetadata metadata) {
+    if (metadata == null) {
+      return Sets.newHashSet();
+    } else {
+      return Sets.newHashSet(metadata.snapshots());
+    }
+  }
+
+  private boolean fileExist(String path) {
+    if (path == null || path.trim().isEmpty()) {
+      return false;
+    }
+    return table.io().newInputFile(path).exists();
+  }
+
+  private static String relativize(String path, String prefix) {
+    String toRemove = prefix;
+    if (!toRemove.endsWith("/")) {
+      toRemove += "/";
+    }
+    if (!path.startsWith(toRemove)) {
+      throw new IllegalArgumentException(
+          String.format("Path %s does not start with %s", path, toRemove));
+    }
+    return path.substring(toRemove.length());
+  }
+
+  private static String newPath(String path, String sourcePrefix, String 
targetPrefix) {
+    return combinePaths(targetPrefix, relativize(path, sourcePrefix));
+  }
+
+  private static String stagingPath(String originalPath, String 
stagingLocation) {
+    return stagingLocation + fileName(originalPath);
+  }
+
+  private static String combinePaths(String absolutePath, String relativePath) 
{
+    String combined = absolutePath;
+    if (!combined.endsWith("/")) {
+      combined += "/";
+    }
+    combined += relativePath;
+    return combined;
+  }
+
+  private static String fileName(String path) {
+    String filename = path;
+    int lastIndex = path.lastIndexOf(File.separator);
+    if (lastIndex != -1) {
+      filename = path.substring(lastIndex + 1);
+    }
+    return filename;
+  }

Review Comment:
   Reuse the same methods in the class `RewriteTablePathUtil`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to