aokolnychyi commented on code in PR #9020:
URL: https://github.com/apache/iceberg/pull/9020#discussion_r1388792770
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java:
##########
@@ -215,41 +240,45 @@ private Dataset<Row>
buildManifestEntryDF(List<ManifestFile> manifests) {
.select("snapshot_id", "sequence_number", "file_sequence_number",
"data_file");
}
- private List<ManifestFile> writeManifestsForUnpartitionedTable(
- Dataset<Row> manifestEntryDF, int numManifests) {
-
- StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
- Types.StructType combinedPartitionType = Partitioning.partitionType(table);
- Types.StructType partitionType = spec.partitionType();
+ private List<ManifestFile> writeUnpartitionedManifests(
+ ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests)
{
- return manifestEntryDF
- .repartition(numManifests)
- .mapPartitions(
- toManifests(manifestWriters(), combinedPartitionType,
partitionType, sparkType),
- manifestEncoder)
- .collectAsList();
+ WriteManifests<?> writeFunc = newWriteManifestsFunc(content,
manifestEntryDF.schema());
+ Dataset<Row> transformedManifestEntryDF =
manifestEntryDF.repartition(numManifests);
+ return writeFunc.apply(transformedManifestEntryDF).collectAsList();
}
- private List<ManifestFile> writeManifestsForPartitionedTable(
- Dataset<Row> manifestEntryDF, int numManifests) {
-
- StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
- Types.StructType combinedPartitionType = Partitioning.partitionType(table);
- Types.StructType partitionType = spec.partitionType();
+ private List<ManifestFile> writePartitionedManifests(
+ ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests)
{
return withReusableDS(
manifestEntryDF,
df -> {
+ WriteManifests<?> writeFunc = newWriteManifestsFunc(content,
df.schema());
Column partitionColumn = df.col("data_file.partition");
- return df.repartitionByRange(numManifests, partitionColumn)
- .sortWithinPartitions(partitionColumn)
- .mapPartitions(
- toManifests(manifestWriters(), combinedPartitionType,
partitionType, sparkType),
- manifestEncoder)
- .collectAsList();
+ Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn,
numManifests);
+ return writeFunc.apply(transformedDF).collectAsList();
});
}
+ private WriteManifests<?> newWriteManifestsFunc(ManifestContent content,
StructType sparkType) {
+ ManifestWriterFactory writers = manifestWriters();
+
+ StructType sparkFileType = (StructType)
sparkType.apply("data_file").dataType();
Review Comment:
Yeah, we always use `data_file` struct in the manifest entry, even for
deletes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]