aokolnychyi commented on code in PR #9020:
URL: https://github.com/apache/iceberg/pull/9020#discussion_r1388792770


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java:
##########
@@ -215,41 +240,45 @@ private Dataset<Row> 
buildManifestEntryDF(List<ManifestFile> manifests) {
         .select("snapshot_id", "sequence_number", "file_sequence_number", 
"data_file");
   }
 
-  private List<ManifestFile> writeManifestsForUnpartitionedTable(
-      Dataset<Row> manifestEntryDF, int numManifests) {
-
-    StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
-    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-    Types.StructType partitionType = spec.partitionType();
+  private List<ManifestFile> writeUnpartitionedManifests(
+      ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) 
{
 
-    return manifestEntryDF
-        .repartition(numManifests)
-        .mapPartitions(
-            toManifests(manifestWriters(), combinedPartitionType, 
partitionType, sparkType),
-            manifestEncoder)
-        .collectAsList();
+    WriteManifests<?> writeFunc = newWriteManifestsFunc(content, 
manifestEntryDF.schema());
+    Dataset<Row> transformedManifestEntryDF = 
manifestEntryDF.repartition(numManifests);
+    return writeFunc.apply(transformedManifestEntryDF).collectAsList();
   }
 
-  private List<ManifestFile> writeManifestsForPartitionedTable(
-      Dataset<Row> manifestEntryDF, int numManifests) {
-
-    StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
-    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-    Types.StructType partitionType = spec.partitionType();
+  private List<ManifestFile> writePartitionedManifests(
+      ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) 
{
 
     return withReusableDS(
         manifestEntryDF,
         df -> {
+          WriteManifests<?> writeFunc = newWriteManifestsFunc(content, 
df.schema());
           Column partitionColumn = df.col("data_file.partition");
-          return df.repartitionByRange(numManifests, partitionColumn)
-              .sortWithinPartitions(partitionColumn)
-              .mapPartitions(
-                  toManifests(manifestWriters(), combinedPartitionType, 
partitionType, sparkType),
-                  manifestEncoder)
-              .collectAsList();
+          Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn, 
numManifests);
+          return writeFunc.apply(transformedDF).collectAsList();
         });
   }
 
+  private WriteManifests<?> newWriteManifestsFunc(ManifestContent content, 
StructType sparkType) {
+    ManifestWriterFactory writers = manifestWriters();
+
+    StructType sparkFileType = (StructType) 
sparkType.apply("data_file").dataType();

Review Comment:
   Yeah, we always use `data_file` struct in the manifest entry, even for 
deletes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to