dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2252698558
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -87,6 +87,24 @@ public Set<Pair<String, String>> copyPlan() {
}
}
+ public static class RewrittenFileInfo implements Serializable {
Review Comment:
Since this is in iceberg core and this class is only used in static class
for record and in SparkAction, I am wondering if we want to define it here.
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -357,6 +438,55 @@ public static RewriteResult<DataFile> rewriteDataManifest(
}
}
+
+ /**
+ * Rewrite a data manifest, replacing path references.
+ *
+ * @param manifestFile source manifest file to rewrite
+ * @param snapshotIds snapshot ids for filtering returned data manifest
entries
+ * @param outputFile output file to rewrite manifest file to
+ * @param io file io
+ * @param format format of the manifest file
+ * @param specsById map of partition specs by id
+ * @param sourcePrefix source prefix that will be replaced
+ * @param targetPrefix target prefix that will replace it
+ * @return rewritten manifest file and a copy plan for the referenced
content files
+ */
+ public static Pair<ManifestFile, RewriteResult<DataFile>>
rewriteDataManifestWithResult(
+ ManifestFile manifestFile,
+ Set<Long> snapshotIds,
+ OutputFile outputFile,
+ FileIO io,
+ int format,
+ Map<Integer, PartitionSpec> specsById,
+ String sourcePrefix,
+ String targetPrefix)
+ throws IOException {
+ PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(format, spec, outputFile,
manifestFile.snapshotId());
+ RewriteResult<DataFile> rewriteResult = null;
+
+ try (ManifestWriter<DataFile> dataManifestWriter = writer;
+ ManifestReader<DataFile> reader =
+ ManifestFiles.read(manifestFile, io, specsById)
+ .select(Arrays.asList("*"))) {
+ rewriteResult =
+ StreamSupport.stream(reader.entries().spliterator(), false)
+ .map(
+ entry ->
+ writeDataFileEntry(
+ entry,
+ snapshotIds,
+ spec,
+ sourcePrefix,
+ targetPrefix,
+ writer))
+ .reduce(new RewriteResult<>(), RewriteResult::append);
+ }
+ return Pair.of(writer.toManifestFile(), rewriteResult);
Review Comment:
Looks like we are returning entire new manifestFile on top of list of data
files in copy plan , but later in SparkAction we only use its new location and
length, can we do better memory wise?
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -274,6 +294,65 @@ public static RewriteResult<ManifestFile>
rewriteManifestList(
}
}
+ /**
+ * Rewrite a manifest list representing a snapshot, replacing path
references.
+ * @param snapshot snapshot represented by the manifest list
+ * @param io file io
+ * @param tableMetadata metadata of table
+ * @param rewrittenManifests information about rewritten manifest files
+ * @param sourcePrefix source prefix that will be replaced
+ * @param targetPrefix target prefix that will replace it
+ * @param stagingDir staging directory
+ * @param outputPath location to write the manifest list
+ */
+ public static void rewriteManifestList(
+ Snapshot snapshot,
+ FileIO io,
+ TableMetadata tableMetadata,
+ Map<String, RewrittenFileInfo> rewrittenManifests,
+ String sourcePrefix,
+ String targetPrefix,
+ String stagingDir,
Review Comment:
I believe this `stagingDir` is no longer used as now path come from
`rewrittenManifests.get(file.path()).getNewPath()`
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -415,54 +421,85 @@ private Set<Pair<String, String>> statsFileCopyPlan(
* @param snapshot snapshot represented by the manifest list
* @param tableMetadata metadata of table
* @param manifestsToRewrite filter of manifests to rewrite.
- * @return a result including a copy plan for the manifests contained in the
manifest list, as
- * well as for the manifest list itself
+ * @return a result including a copy plan for the manifest list itself
*/
private RewriteResult<ManifestFile> rewriteManifestList(
- Snapshot snapshot, TableMetadata tableMetadata, Set<String>
manifestsToRewrite) {
+ Snapshot snapshot,
+ TableMetadata tableMetadata,
+ Map<String, RewrittenFileInfo> rewrittenManifests) {
RewriteResult<ManifestFile> result = new RewriteResult<>();
String path = snapshot.manifestListLocation();
String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
- RewriteResult<ManifestFile> rewriteResult =
- RewriteTablePathUtil.rewriteManifestList(
- snapshot,
- table.io(),
- tableMetadata,
- manifestsToRewrite,
- sourcePrefix,
- targetPrefix,
- stagingDir,
- outputPath);
-
- result.append(rewriteResult);
+ RewriteTablePathUtil.rewriteManifestList(
+ snapshot,
+ table.io(),
+ tableMetadata,
+ rewrittenManifests,
+ sourcePrefix,
+ targetPrefix,
+ stagingDir,
+ outputPath);
+
// add the manifest list copy plan itself to the result
result
.copyPlan()
.add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path,
sourcePrefix, targetPrefix)));
return result;
}
- private Set<String> manifestsToRewrite(
+ private Set<ManifestFile> manifestsToRewrite(
Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
try {
Table endStaticTable = newStaticTable(endVersionName, table.io());
- Dataset<Row> lastVersionFiles =
manifestDS(endStaticTable).select("path");
+ Dataset<Row> allManifestsDF = manifestDS(endStaticTable).select("path");
+ Set<String> expectedManifestPaths;
+
if (startMetadata == null) {
- return
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+ expectedManifestPaths =
+
Sets.newHashSet(allManifestsDF.distinct().as(Encoders.STRING()).collectAsList());
} else {
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
- return Sets.newHashSet(
- lastVersionFiles
- .distinct()
- .filter(
- functions
- .column(ManifestFile.SNAPSHOT_ID.name())
- .isInCollection(deltaSnapshotIds))
- .as(Encoders.STRING())
- .collectAsList());
+ expectedManifestPaths =
+ Sets.newHashSet(
+ allManifestsDF
+ .distinct()
+ .filter(
+ functions
+ .column(ManifestFile.SNAPSHOT_ID.name())
+ .isInCollection(deltaSnapshotIds))
+ .as(Encoders.STRING())
+ .collectAsList());
}
+
+ Set<ManifestFile> foundManifests =
+ deltaSnapshots.stream()
+ .flatMap(
+ s -> {
+ try {
+ return s.allManifests(table.io()).stream();
+ } catch (NotFoundException e) {
+ LOG.warn(
+ "Skipping snapshot {} as its manifest list is
missing (likely expired).",
+ s.snapshotId(),
+ e);
+ return Stream.empty();
+ }
+ })
+ .collect(Collectors.toSet());
+
+ Set<String> foundManifestPaths =
+
foundManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
+ Set<String> missingPaths = Sets.difference(expectedManifestPaths,
foundManifestPaths);
+ Preconditions.checkState(
+ missingPaths.isEmpty(),
+ "Could not find all expected manifests. Missing files: %s",
+ String.join(", ", missingPaths));
+
+ return foundManifests.stream()
+ .filter(m -> expectedManifestPaths.contains(m.path()))
+ .collect(Collectors.toSet());
Review Comment:
If you are looking for set of manifests filtered by snapshot ids, I think
this can probably be replaced with something similar to
https://github.com/apache/iceberg/blob/main/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L182-L190,
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -491,77 +528,128 @@ public RewriteContentFileResult
appendDeleteFile(RewriteResult<DeleteFile> r1) {
}
}
- /** Rewrite manifest files in a distributed manner and return rewritten data
files path pairs. */
- private RewriteContentFileResult rewriteManifests(
+ public static class ManifestsRewriteResult {
+ private final RewriteContentFileResult contentFileResult;
+ private final Map<String, RewrittenFileInfo> rewrittenManifests;
+
+ ManifestsRewriteResult(
+ RewriteContentFileResult contentFileResult,
+ Map<String, RewrittenFileInfo> rewrittenManifests) {
+ this.contentFileResult = contentFileResult;
+ this.rewrittenManifests = rewrittenManifests;
+ }
+
+ public RewriteContentFileResult getContentFileResult() {
+ return contentFileResult;
+ }
+
+ public Map<String, RewrittenFileInfo> getRewrittenManifests() {
+ return rewrittenManifests;
+ }
+ }
Review Comment:
I believe we also need the similar for position deletes, where we need write
delete files before write delete manifests, so I am wondering if we can
generalize this result class to take into account both
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -282,23 +287,25 @@ private String rebuildMetadata() {
RewriteResult<Snapshot> rewriteVersionResult =
rewriteVersionFiles(endMetadata);
Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata,
rewriteVersionResult.toRewrite());
- Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots,
startMetadata);
- Set<Snapshot> validSnapshots =
- Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+ // rebuild manifest files
+ Set<ManifestFile> manifestsToRewrite = manifestsToRewrite(deltaSnapshots,
startMetadata);
+ ManifestsRewriteResult rewriteManifestResult =
+ rewriteManifests(deltaSnapshots, endMetadata, manifestsToRewrite);
Review Comment:
Just some notes about behavior change here and also potential performance
implications
Before this change we are doing rewrite top down in one pass from
version-file -> manifest-list -> parallelize rewrite of manifest-files as well
as position deletes. So it's optimized to do top layer rewrite + discover the
subset of bottom layer require to be rewritten and gradually expand.
However in order to get size correctly, we need to reverse some of the
rewrite ordering by rewrite the manifest first and then later the
manifest-list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]