RussellSpitzer commented on code in PR #12840:
URL: https://github.com/apache/iceberg/pull/12840#discussion_r2054845390


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java:
##########
@@ -251,12 +282,40 @@ private List<ManifestFile> writeUnpartitionedManifests(
   private List<ManifestFile> writePartitionedManifests(
       ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) 
{
 
+    // Extract desired clustering criteria into a dedicated column
+    Dataset<Row> clusteredManifestEntryDF;
+
+    if (partitionFieldClustering != null) {
+      LOG.info(
+          "Clustering manifests for specId {} by partition columns by {} ",
+          spec.specId(),
+          partitionFieldClustering);
+
+      // Map the top level partition column names to the column name 
referenced within the manifest
+      // entry dataframe
+      Column[] actualPartitionColumns =
+          partitionFieldClustering.stream()
+              .map(p -> col("data_file.partition." + p))
+              .toArray(Column[]::new);
+
+      // Form a new temporary column to cluster manifests on, based on the 
custom clustering columns
+      // order provided
+      clusteredManifestEntryDF =
+          manifestEntryDF.withColumn(
+              CUSTOM_CLUSTERING_COLUMN_NAME, 
functions.struct(actualPartitionColumns));
+    } else {
+      clusteredManifestEntryDF =
+          manifestEntryDF.withColumn(CUSTOM_CLUSTERING_COLUMN_NAME, 
col("data_file.partition"));
+    }
+
     return withReusableDS(
-        manifestEntryDF,
+        clusteredManifestEntryDF,
         df -> {
           WriteManifests<?> writeFunc = newWriteManifestsFunc(content, 
df.schema());
-          Column partitionColumn = df.col("data_file.partition");
-          Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn, 
numManifests);
+          Column partitionColumn = df.col(CUSTOM_CLUSTERING_COLUMN_NAME);

Review Comment:
   Do we need a custom column here? Since we have a column expression above we 
can just pass that directly into `repartitionAndSort` and not add and remove a 
column.
   
   ```java
   private Column sortColumn() {
       if (partitionFieldClustering != null) {
        ...
        return functions.struct(...)
       }
       else
       {
        return new Column("data_file.partition") // or use constant here
       }
   }
   
   repartitionAndSort(df, sortColumn(), numManifests)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to