dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2252813538
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -415,54 +421,85 @@ private Set<Pair<String, String>> statsFileCopyPlan(
* @param snapshot snapshot represented by the manifest list
* @param tableMetadata metadata of table
* @param manifestsToRewrite filter of manifests to rewrite.
- * @return a result including a copy plan for the manifests contained in the
manifest list, as
- * well as for the manifest list itself
+ * @return a result including a copy plan for the manifest list itself
*/
private RewriteResult<ManifestFile> rewriteManifestList(
- Snapshot snapshot, TableMetadata tableMetadata, Set<String>
manifestsToRewrite) {
+ Snapshot snapshot,
+ TableMetadata tableMetadata,
+ Map<String, RewrittenFileInfo> rewrittenManifests) {
RewriteResult<ManifestFile> result = new RewriteResult<>();
String path = snapshot.manifestListLocation();
String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
- RewriteResult<ManifestFile> rewriteResult =
- RewriteTablePathUtil.rewriteManifestList(
- snapshot,
- table.io(),
- tableMetadata,
- manifestsToRewrite,
- sourcePrefix,
- targetPrefix,
- stagingDir,
- outputPath);
-
- result.append(rewriteResult);
+ RewriteTablePathUtil.rewriteManifestList(
+ snapshot,
+ table.io(),
+ tableMetadata,
+ rewrittenManifests,
+ sourcePrefix,
+ targetPrefix,
+ stagingDir,
+ outputPath);
+
// add the manifest list copy plan itself to the result
result
.copyPlan()
.add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path,
sourcePrefix, targetPrefix)));
return result;
}
- private Set<String> manifestsToRewrite(
+ private Set<ManifestFile> manifestsToRewrite(
Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
try {
Table endStaticTable = newStaticTable(endVersionName, table.io());
- Dataset<Row> lastVersionFiles =
manifestDS(endStaticTable).select("path");
+ Dataset<Row> allManifestsDF = manifestDS(endStaticTable).select("path");
+ Set<String> expectedManifestPaths;
+
if (startMetadata == null) {
- return
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+ expectedManifestPaths =
+
Sets.newHashSet(allManifestsDF.distinct().as(Encoders.STRING()).collectAsList());
} else {
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
- return Sets.newHashSet(
- lastVersionFiles
- .distinct()
- .filter(
- functions
- .column(ManifestFile.SNAPSHOT_ID.name())
- .isInCollection(deltaSnapshotIds))
- .as(Encoders.STRING())
- .collectAsList());
+ expectedManifestPaths =
+ Sets.newHashSet(
+ allManifestsDF
+ .distinct()
+ .filter(
+ functions
+ .column(ManifestFile.SNAPSHOT_ID.name())
+ .isInCollection(deltaSnapshotIds))
+ .as(Encoders.STRING())
+ .collectAsList());
}
+
+ Set<ManifestFile> foundManifests =
+ deltaSnapshots.stream()
+ .flatMap(
+ s -> {
+ try {
+ return s.allManifests(table.io()).stream();
+ } catch (NotFoundException e) {
+ LOG.warn(
+ "Skipping snapshot {} as its manifest list is
missing (likely expired).",
+ s.snapshotId(),
+ e);
+ return Stream.empty();
+ }
+ })
+ .collect(Collectors.toSet());
+
+ Set<String> foundManifestPaths =
+
foundManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
+ Set<String> missingPaths = Sets.difference(expectedManifestPaths,
foundManifestPaths);
+ Preconditions.checkState(
+ missingPaths.isEmpty(),
+ "Could not find all expected manifests. Missing files: %s",
+ String.join(", ", missingPaths));
+
+ return foundManifests.stream()
+ .filter(m -> expectedManifestPaths.contains(m.path()))
+ .collect(Collectors.toSet());
Review Comment:
yeah I think we might want the same thing?
From what I can tell, [`.all_manifests`
table](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/AllManifestsTable.java)
include all the valid manifest file that are referenced from any snapshot
currently tracked by the
table, and if we apply the filter on referenced_snapshot with
deltaSnapshotIds, it shall provide us the same result as `foundManifests`. I am
thinking if we use ManifestEncoders against returned `Dataset<Row>` and collect
as a list (or a set) of manifestsToRewrite
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]