stevenzwu commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2291756180
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -85,6 +86,14 @@ public Set<T> toRewrite() {
public Set<Pair<String, String>> copyPlan() {
return copyPlan;
}
+
+ public Long size() {
+ return size;
+ }
+
+ public void setSize(long newSize) {
Review Comment:
nit: Iceberg style skips the `set`. so this will be `void size(long newSize)`
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -276,6 +287,62 @@ public static RewriteResult<ManifestFile>
rewriteManifestList(
}
}
+ /**
+ * Rewrite a manifest list representing a snapshot, replacing path
references.
+ *
+ * @param snapshot snapshot represented by the manifest list
+ * @param io file io
+ * @param tableMetadata metadata of table
+ * @param rewrittenManifestLengths rewritten manifest files and their sizes
+ * @param sourcePrefix source prefix that will be replaced
+ * @param targetPrefix target prefix that will replace it
+ * @param outputPath location to write the manifest list
+ */
+ public static void rewriteManifestList(
Review Comment:
I know you mentioned that `RewriteResult` doesn't fit the manifest list
rewrite. but `void` return type would also be inconsistent with other `rewrite`
methods in this class.
What do you think about moving this inside the
`RewriteTablePathSparkAction`? I see this method similar to the
`rewriteVersionFile` method in the Spark class.
This way, we have one less public API to worry about when we are going to
refactor this non-intuitive rewrite result.
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -417,55 +424,37 @@ private Set<Pair<String, String>> statsFileCopyPlan(
*
* @param snapshot snapshot represented by the manifest list
* @param tableMetadata metadata of table
- * @param manifestsToRewrite filter of manifests to rewrite.
- * @return a result including a copy plan for the manifests contained in the
manifest list, as
- * well as for the manifest list itself
+ * @param rewrittenManifestLengths lengths of rewritten manifests
+ * @return a result including a copy plan for the manifest list itself
*/
private RewriteResult<ManifestFile> rewriteManifestList(
- Snapshot snapshot, TableMetadata tableMetadata, Set<String>
manifestsToRewrite) {
+ Snapshot snapshot, TableMetadata tableMetadata, Map<String, Long>
rewrittenManifestLengths) {
RewriteResult<ManifestFile> result = new RewriteResult<>();
String path = snapshot.manifestListLocation();
String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix,
stagingDir);
- RewriteResult<ManifestFile> rewriteResult =
- RewriteTablePathUtil.rewriteManifestList(
- snapshot,
- table.io(),
- tableMetadata,
- manifestsToRewrite,
- sourcePrefix,
- targetPrefix,
- stagingDir,
- outputPath);
-
- result.append(rewriteResult);
+ RewriteTablePathUtil.rewriteManifestList(
+ snapshot,
+ table.io(),
+ tableMetadata,
+ rewrittenManifestLengths,
+ sourcePrefix,
+ targetPrefix,
+ outputPath);
+
// add the manifest list copy plan itself to the result
result
.copyPlan()
.add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path,
sourcePrefix, targetPrefix)));
return result;
}
- private Set<String> manifestsToRewrite(
- Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
+ private Set<ManifestFile> manifestsToRewrite(Set<Snapshot> liveSnapshots) {
Review Comment:
previously this method extract manifest files from the delta snapshots only.
Right now, it returns manifest files from all live snapshots. Is the method
name `manifestsToRewrite` still accurate?
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -447,6 +551,58 @@ public static RewriteResult<DeleteFile>
rewriteDeleteManifest(
}
}
+ /**
+ * Rewrite a delete manifest, replacing path references.
+ *
+ * @param manifestFile source delete manifest to rewrite
+ * @param snapshotIds snapshot ids for filtering returned delete manifest
entries
+ * @param outputFile output file to rewrite manifest file to
+ * @param io file io
+ * @param format format of the manifest file
+ * @param specsById map of partition specs by id
+ * @param sourcePrefix source prefix that will be replaced
+ * @param targetPrefix target prefix that will replace it
+ * @param stagingLocation staging location for rewritten files (referred
delete file will be
+ * rewritten here)
+ * @return size of the resulting manifest file and a copy plan for the
referenced content files
+ */
+ public static Pair<Long, RewriteResult<DeleteFile>>
rewriteDeleteManifestWithResult(
Review Comment:
> the way RewriteResult is currently implemented and used is
counterintuitive to me.
Definitely agree. A larger refactoring can really help. but that probably
can be tackled as a follow-up.
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,36 +483,60 @@ public RewriteContentFileResult
appendDeleteFile(RewriteResult<DeleteFile> r1) {
}
}
- /** Rewrite manifest files in a distributed manner and return rewritten data
files path pairs. */
- private RewriteContentFileResult rewriteManifests(
+ /**
+ * Rewrite manifest files in a distributed manner and return the resulting
manifests and content
+ * files selected for rewriting.
+ */
+ private Map<String, RewriteContentFileResult> rewriteManifests(
Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata,
Set<ManifestFile> toRewrite) {
if (toRewrite.isEmpty()) {
- return new RewriteContentFileResult();
+ return Maps.newHashMap();
}
Encoder<ManifestFile> manifestFileEncoder =
Encoders.javaSerialization(ManifestFile.class);
+ Encoder<RewriteContentFileResult> manifestResultEncoder =
+ Encoders.javaSerialization(RewriteContentFileResult.class);
+ Encoder<Tuple2<String, RewriteContentFileResult>> tupleEncoder =
+ Encoders.tuple(Encoders.STRING(), manifestResultEncoder);
+
Dataset<ManifestFile> manifestDS =
spark().createDataset(Lists.newArrayList(toRewrite),
manifestFileEncoder);
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
- return manifestDS
- .repartition(toRewrite.size())
- .map(
- toManifests(
- tableBroadcast(),
- sparkContext().broadcast(deltaSnapshotIds),
- stagingDir,
- tableMetadata.formatVersion(),
- sourcePrefix,
- targetPrefix),
- Encoders.bean(RewriteContentFileResult.class))
- // duplicates are expected here as the same data file can have
different statuses
- // (e.g. added and deleted)
- .reduce((ReduceFunction<RewriteContentFileResult>)
RewriteContentFileResult::append);
- }
-
- private static MapFunction<ManifestFile, RewriteContentFileResult>
toManifests(
+ Iterator<Tuple2<String, RewriteContentFileResult>> resultIterator =
+ manifestDS
+ .repartition(toRewrite.size())
+ .map(
+ toManifests(
+ tableBroadcast(),
+ sparkContext().broadcast(deltaSnapshotIds),
+ stagingDir,
+ tableMetadata.formatVersion(),
+ sourcePrefix,
+ targetPrefix),
+ tupleEncoder)
+ .toLocalIterator();
Review Comment:
will we have memory concern with `toLocalIterator`?
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -281,24 +281,32 @@ private String rebuildMetadata() {
// rebuild version files
RewriteResult<Snapshot> rewriteVersionResult =
rewriteVersionFiles(endMetadata);
Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata,
rewriteVersionResult.toRewrite());
-
- Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots,
startMetadata);
Set<Snapshot> validSnapshots =
Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+ // rebuild manifest files
+ Set<ManifestFile> manifestsToRewrite = manifestsToRewrite(validSnapshots);
Review Comment:
previously, `manifestsToRewrite` contains manifest files from delta
snapshots. now it contains manifest files from all live snapshots. Can you help
me understand how is the incremental rewrite still maintained in this case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]