bharos commented on code in PR #12327: URL: https://github.com/apache/iceberg/pull/12327#discussion_r1968579723
########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java: ########## @@ -635,6 +635,67 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } + @TestTemplate + public void addFileTableOldSpecDataAfterPartitionSpecEvolved() { + createPartitionedFileTable("parquet"); + createIcebergTable( + "id Integer, name String, dept String, subdept String", + "PARTITIONED BY (id, dept, subdept)"); + sql("ALTER TABLE %s DROP PARTITION FIELD dept", tableName); + sql( + "ALTER TABLE %s DROP PARTITION FIELD subdept", + tableName); // This spec is matching with the input data which is partitioned just by "id" + sql("ALTER TABLE %s ADD PARTITION FIELD subdept", tableName); + + if (formatVersion == 1) { + assertThatThrownBy( Review Comment: I think keeping this will help understand that failing in V1 is expected, at least for somebody not familiar with the void transforms in v1 ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java: ########## @@ -265,55 +271,16 @@ public String description() { return "AddFiles"; } - private void validatePartitionSpec(Table table, Map<String, String> partitionFilter) { - List<PartitionField> partitionFields = table.spec().fields(); - Set<String> partitionNames = - table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet()); - - boolean tablePartitioned = !partitionFields.isEmpty(); - boolean partitionSpecPassed = !partitionFilter.isEmpty(); - + private void nonIdentityPartitionCheck(Table table) { // Check for any non-identity partition columns List<PartitionField> nonIdentityFields = - partitionFields.stream() + table.spec().fields().stream() Review Comment: Removing this method as per above discussion ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java: ########## @@ -1124,7 +1115,67 @@ private static PartitionSpec findCompatibleSpec( throw new IllegalArgumentException( String.format( "Cannot find a partition spec in Iceberg table %s that matches the partition" - + " columns (%s) in Spark table %s", - icebergTable, sparkPartNames, sparkTable)); + + " columns (%s) in input table", + icebergTable, partitionNames)); + } + + /** + * Returns the first partition spec in an IcebergTable that shares the same names and ordering as + * the partition columns in a given Spark Table. Throws an error if not found + */ + private static PartitionSpec findCompatibleSpec( + Table icebergTable, SparkSession spark, String sparkTable) throws AnalysisException { + List<String> parts = Lists.newArrayList(Splitter.on('.').limit(2).split(sparkTable)); + String db = parts.size() == 1 ? "default" : parts.get(0); + String table = parts.get(parts.size() == 1 ? 0 : 1); + + List<String> sparkPartNames = + spark.catalog().listColumns(db, table).collectAsList().stream() + .filter(org.apache.spark.sql.catalog.Column::isPartition) + .map(org.apache.spark.sql.catalog.Column::name) + .map(name -> name.toLowerCase(Locale.ROOT)) Review Comment: done ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java: ########## @@ -635,6 +635,67 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } + @TestTemplate + public void addFileTableOldSpecDataAfterPartitionSpecEvolved() { + createPartitionedFileTable("parquet"); + createIcebergTable( + "id Integer, name String, dept String, subdept String", + "PARTITIONED BY (id, dept, subdept)"); + sql("ALTER TABLE %s DROP PARTITION FIELD dept", tableName); + sql( + "ALTER TABLE %s DROP PARTITION FIELD subdept", + tableName); // This spec is matching with the input data which is partitioned just by "id" + sql("ALTER TABLE %s ADD PARTITION FIELD subdept", tableName); + + if (formatVersion == 1) { + assertThatThrownBy( + () -> + scalarSql( + "CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + String.format( + "Cannot add data files to target table %s because that table is partitioned and contains non-identity partition transforms which will not be compatible.", + tableName)); + return; + } + + List<Object[]> result = + sql( + "CALL %s.system.add_files(table => '%s', source_table => '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + assertOutput(result, 8L, 4L); + assertEquals( + "Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); Review Comment: Added ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java: ########## @@ -265,55 +271,16 @@ public String description() { return "AddFiles"; } - private void validatePartitionSpec(Table table, Map<String, String> partitionFilter) { - List<PartitionField> partitionFields = table.spec().fields(); - Set<String> partitionNames = - table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet()); - - boolean tablePartitioned = !partitionFields.isEmpty(); - boolean partitionSpecPassed = !partitionFilter.isEmpty(); - + private void nonIdentityPartitionCheck(Table table) { Review Comment: Makes sense. Since we check if (allIdentity) in findCompatibleSpec, we can remove this one ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java: ########## @@ -1124,7 +1115,67 @@ private static PartitionSpec findCompatibleSpec( throw new IllegalArgumentException( String.format( "Cannot find a partition spec in Iceberg table %s that matches the partition" - + " columns (%s) in Spark table %s", - icebergTable, sparkPartNames, sparkTable)); + + " columns (%s) in input table", + icebergTable, partitionNames)); + } + + /** + * Returns the first partition spec in an IcebergTable that shares the same names and ordering as + * the partition columns in a given Spark Table. Throws an error if not found + */ + private static PartitionSpec findCompatibleSpec( + Table icebergTable, SparkSession spark, String sparkTable) throws AnalysisException { + List<String> parts = Lists.newArrayList(Splitter.on('.').limit(2).split(sparkTable)); + String db = parts.size() == 1 ? "default" : parts.get(0); + String table = parts.get(parts.size() == 1 ? 0 : 1); + + List<String> sparkPartNames = + spark.catalog().listColumns(db, table).collectAsList().stream() + .filter(org.apache.spark.sql.catalog.Column::isPartition) + .map(org.apache.spark.sql.catalog.Column::name) + .map(name -> name.toLowerCase(Locale.ROOT)) + .collect(Collectors.toList()); + return findCompatibleSpec(sparkPartNames, icebergTable); + } + + public static void validatePartitionFilter( Review Comment: AddFilesProcedure is in package org.apache.iceberg.spark.procedures; This one is in package org.apache.iceberg.spark; Is there any common package where this can be moved to ? Otherwise will keep it public here for both findCompatibleSpec and validatePartitionFilter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org