aokolnychyi commented on code in PR #9020: URL: https://github.com/apache/iceberg/pull/9020#discussion_r1388792770
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java: ########## @@ -215,41 +240,45 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) { .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file"); } - private List<ManifestFile> writeManifestsForUnpartitionedTable( - Dataset<Row> manifestEntryDF, int numManifests) { - - StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); - Types.StructType combinedPartitionType = Partitioning.partitionType(table); - Types.StructType partitionType = spec.partitionType(); + private List<ManifestFile> writeUnpartitionedManifests( + ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) { - return manifestEntryDF - .repartition(numManifests) - .mapPartitions( - toManifests(manifestWriters(), combinedPartitionType, partitionType, sparkType), - manifestEncoder) - .collectAsList(); + WriteManifests<?> writeFunc = newWriteManifestsFunc(content, manifestEntryDF.schema()); + Dataset<Row> transformedManifestEntryDF = manifestEntryDF.repartition(numManifests); + return writeFunc.apply(transformedManifestEntryDF).collectAsList(); } - private List<ManifestFile> writeManifestsForPartitionedTable( - Dataset<Row> manifestEntryDF, int numManifests) { - - StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); - Types.StructType combinedPartitionType = Partitioning.partitionType(table); - Types.StructType partitionType = spec.partitionType(); + private List<ManifestFile> writePartitionedManifests( + ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) { return withReusableDS( manifestEntryDF, df -> { + WriteManifests<?> writeFunc = newWriteManifestsFunc(content, df.schema()); Column partitionColumn = df.col("data_file.partition"); - return df.repartitionByRange(numManifests, partitionColumn) - .sortWithinPartitions(partitionColumn) - .mapPartitions( - toManifests(manifestWriters(), combinedPartitionType, partitionType, sparkType), - manifestEncoder) - .collectAsList(); + Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn, numManifests); + return writeFunc.apply(transformedDF).collectAsList(); }); } + private WriteManifests<?> newWriteManifestsFunc(ManifestContent content, StructType sparkType) { + ManifestWriterFactory writers = manifestWriters(); + + StructType sparkFileType = (StructType) sparkType.apply("data_file").dataType(); Review Comment: Yeah, we always use `data_file` struct in the manifest entry, even for deletes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org