bharos commented on code in PR #12327:
URL: https://github.com/apache/iceberg/pull/12327#discussion_r1968579723


##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java:
##########
@@ -635,6 +635,67 @@ public void 
addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() {
         sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
   }
 
+  @TestTemplate
+  public void addFileTableOldSpecDataAfterPartitionSpecEvolved() {
+    createPartitionedFileTable("parquet");
+    createIcebergTable(
+        "id Integer, name String, dept String, subdept String",
+        "PARTITIONED BY (id, dept, subdept)");
+    sql("ALTER TABLE %s DROP PARTITION FIELD dept", tableName);
+    sql(
+        "ALTER TABLE %s DROP PARTITION FIELD subdept",
+        tableName); // This spec is matching with the input data which is 
partitioned just by "id"
+    sql("ALTER TABLE %s ADD PARTITION FIELD subdept", tableName);
+
+    if (formatVersion == 1) {
+      assertThatThrownBy(

Review Comment:
   I think keeping this will help understand that failing in V1 is expected, at 
least for somebody not familiar with the void transforms in v1



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -265,55 +271,16 @@ public String description() {
     return "AddFiles";
   }
 
-  private void validatePartitionSpec(Table table, Map<String, String> 
partitionFilter) {
-    List<PartitionField> partitionFields = table.spec().fields();
-    Set<String> partitionNames =
-        
table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet());
-
-    boolean tablePartitioned = !partitionFields.isEmpty();
-    boolean partitionSpecPassed = !partitionFilter.isEmpty();
-
+  private void nonIdentityPartitionCheck(Table table) {
     // Check for any non-identity partition columns
     List<PartitionField> nonIdentityFields =
-        partitionFields.stream()
+        table.spec().fields().stream()

Review Comment:
   Removing this method as per above discussion 



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -1124,7 +1115,67 @@ private static PartitionSpec findCompatibleSpec(
     throw new IllegalArgumentException(
         String.format(
             "Cannot find a partition spec in Iceberg table %s that matches the 
partition"
-                + " columns (%s) in Spark table %s",
-            icebergTable, sparkPartNames, sparkTable));
+                + " columns (%s) in input table",
+            icebergTable, partitionNames));
+  }
+
+  /**
+   * Returns the first partition spec in an IcebergTable that shares the same 
names and ordering as
+   * the partition columns in a given Spark Table. Throws an error if not found
+   */
+  private static PartitionSpec findCompatibleSpec(
+      Table icebergTable, SparkSession spark, String sparkTable) throws 
AnalysisException {
+    List<String> parts = 
Lists.newArrayList(Splitter.on('.').limit(2).split(sparkTable));
+    String db = parts.size() == 1 ? "default" : parts.get(0);
+    String table = parts.get(parts.size() == 1 ? 0 : 1);
+
+    List<String> sparkPartNames =
+        spark.catalog().listColumns(db, table).collectAsList().stream()
+            .filter(org.apache.spark.sql.catalog.Column::isPartition)
+            .map(org.apache.spark.sql.catalog.Column::name)
+            .map(name -> name.toLowerCase(Locale.ROOT))

Review Comment:
   done



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java:
##########
@@ -635,6 +635,67 @@ public void 
addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() {
         sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
   }
 
+  @TestTemplate
+  public void addFileTableOldSpecDataAfterPartitionSpecEvolved() {
+    createPartitionedFileTable("parquet");
+    createIcebergTable(
+        "id Integer, name String, dept String, subdept String",
+        "PARTITIONED BY (id, dept, subdept)");
+    sql("ALTER TABLE %s DROP PARTITION FIELD dept", tableName);
+    sql(
+        "ALTER TABLE %s DROP PARTITION FIELD subdept",
+        tableName); // This spec is matching with the input data which is 
partitioned just by "id"
+    sql("ALTER TABLE %s ADD PARTITION FIELD subdept", tableName);
+
+    if (formatVersion == 1) {
+      assertThatThrownBy(
+              () ->
+                  scalarSql(
+                      "CALL %s.system.add_files('%s', '`parquet`.`%s`')",
+                      catalogName, tableName, fileTableDir.getAbsolutePath()))
+          .isInstanceOf(IllegalArgumentException.class)
+          .hasMessageContaining(
+              String.format(
+                  "Cannot add data files to target table %s because that table 
is partitioned and contains non-identity partition transforms which will not be 
compatible.",
+                  tableName));
+      return;
+    }
+
+    List<Object[]> result =
+        sql(
+            "CALL %s.system.add_files(table => '%s', source_table => 
'`parquet`.`%s`')",
+            catalogName, tableName, fileTableDir.getAbsolutePath());
+
+    assertOutput(result, 8L, 4L);
+    assertEquals(
+        "Iceberg table contains correct data",
+        sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", 
sourceTableName),
+        sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));

Review Comment:
   Added



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -265,55 +271,16 @@ public String description() {
     return "AddFiles";
   }
 
-  private void validatePartitionSpec(Table table, Map<String, String> 
partitionFilter) {
-    List<PartitionField> partitionFields = table.spec().fields();
-    Set<String> partitionNames =
-        
table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet());
-
-    boolean tablePartitioned = !partitionFields.isEmpty();
-    boolean partitionSpecPassed = !partitionFilter.isEmpty();
-
+  private void nonIdentityPartitionCheck(Table table) {

Review Comment:
   Makes sense. Since we check  if (allIdentity)  in findCompatibleSpec, we can 
remove this one



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -1124,7 +1115,67 @@ private static PartitionSpec findCompatibleSpec(
     throw new IllegalArgumentException(
         String.format(
             "Cannot find a partition spec in Iceberg table %s that matches the 
partition"
-                + " columns (%s) in Spark table %s",
-            icebergTable, sparkPartNames, sparkTable));
+                + " columns (%s) in input table",
+            icebergTable, partitionNames));
+  }
+
+  /**
+   * Returns the first partition spec in an IcebergTable that shares the same 
names and ordering as
+   * the partition columns in a given Spark Table. Throws an error if not found
+   */
+  private static PartitionSpec findCompatibleSpec(
+      Table icebergTable, SparkSession spark, String sparkTable) throws 
AnalysisException {
+    List<String> parts = 
Lists.newArrayList(Splitter.on('.').limit(2).split(sparkTable));
+    String db = parts.size() == 1 ? "default" : parts.get(0);
+    String table = parts.get(parts.size() == 1 ? 0 : 1);
+
+    List<String> sparkPartNames =
+        spark.catalog().listColumns(db, table).collectAsList().stream()
+            .filter(org.apache.spark.sql.catalog.Column::isPartition)
+            .map(org.apache.spark.sql.catalog.Column::name)
+            .map(name -> name.toLowerCase(Locale.ROOT))
+            .collect(Collectors.toList());
+    return findCompatibleSpec(sparkPartNames, icebergTable);
+  }
+
+  public static void validatePartitionFilter(

Review Comment:
   AddFilesProcedure is in package org.apache.iceberg.spark.procedures;
   This one is in package org.apache.iceberg.spark;
   
   Is there any common package where this can be moved to ? Otherwise will keep 
it public here for both findCompatibleSpec and validatePartitionFilter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to