RussellSpitzer commented on code in PR #12840:
URL: https://github.com/apache/iceberg/pull/12840#discussion_r2054845390
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java:
##########
@@ -251,12 +282,40 @@ private List<ManifestFile> writeUnpartitionedManifests(
private List<ManifestFile> writePartitionedManifests(
ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests)
{
+ // Extract desired clustering criteria into a dedicated column
+ Dataset<Row> clusteredManifestEntryDF;
+
+ if (partitionFieldClustering != null) {
+ LOG.info(
+ "Clustering manifests for specId {} by partition columns by {} ",
+ spec.specId(),
+ partitionFieldClustering);
+
+ // Map the top level partition column names to the column name
referenced within the manifest
+ // entry dataframe
+ Column[] actualPartitionColumns =
+ partitionFieldClustering.stream()
+ .map(p -> col("data_file.partition." + p))
+ .toArray(Column[]::new);
+
+ // Form a new temporary column to cluster manifests on, based on the
custom clustering columns
+ // order provided
+ clusteredManifestEntryDF =
+ manifestEntryDF.withColumn(
+ CUSTOM_CLUSTERING_COLUMN_NAME,
functions.struct(actualPartitionColumns));
+ } else {
+ clusteredManifestEntryDF =
+ manifestEntryDF.withColumn(CUSTOM_CLUSTERING_COLUMN_NAME,
col("data_file.partition"));
+ }
+
return withReusableDS(
- manifestEntryDF,
+ clusteredManifestEntryDF,
df -> {
WriteManifests<?> writeFunc = newWriteManifestsFunc(content,
df.schema());
- Column partitionColumn = df.col("data_file.partition");
- Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn,
numManifests);
+ Column partitionColumn = df.col(CUSTOM_CLUSTERING_COLUMN_NAME);
Review Comment:
Do we need a custom column here? Since we have a column expression above we
can just pass that directly into `repartitionAndSort` and not add and remove a
column.
```java
private Column sortColumn() {
if (partitionFieldClustering != null) {
...
returnt functions.struct(...)
}
else
{
return new Column("data_file.partition") // or use constant here
}
}
repartitionAndSort(df, sortColumn(), numManifests)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]