karuppayya commented on code in PR #11045: URL: https://github.com/apache/iceberg/pull/11045#discussion_r1737628804
########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java: ########## @@ -242,6 +249,34 @@ public void testUpdateWithIdentifierField() { sql("select * from %s order by _change_ordinal, id, data", viewName)); } + @TestTemplate + public void testUpdateWithInComparableType() { + createTableWithIncomparableType(); + + sql("INSERT INTO %s VALUES (2, Map('b','b'), 12)", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snap1 = table.currentSnapshot(); + + sql("INSERT OVERWRITE %s VALUES (3, Map('c','c'), 13), (2, Map('d','d'), 12)", tableName); + table.refresh(); + Snapshot snap2 = table.currentSnapshot(); + + assertThat(table.sortOrder()).isNotNull(); + + List<Object[]> returns = + sql("CALL %s.system.create_changelog_view(table => '%s')", catalogName, tableName); + + String viewName = (String) returns.get(0)[0]; + assertEquals( + "Rows should match", + ImmutableList.of( + row(2, Collections.singletonMap("b", "b"), 12, INSERT, 0, snap1.snapshotId()), + row(2, Collections.singletonMap("b", "b"), 12, DELETE, 1, snap2.snapshotId()), + row(2, Collections.singletonMap("d", "d"), 12, INSERT, 1, snap2.snapshotId()), + row(3, Collections.singletonMap("c", "c"), 13, INSERT, 1, snap2.snapshotId())), + sql("select * from %s order by _change_ordinal, id", viewName)); Review Comment: Should we remove this? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java: ########## @@ -183,21 +185,26 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) { } private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) { - Predicate<String> columnsToKeep; - if (netChanges) { - Set<String> metadataColumn = - Sets.newHashSet( - MetadataColumns.CHANGE_TYPE.name(), - MetadataColumns.CHANGE_ORDINAL.name(), - MetadataColumns.COMMIT_SNAPSHOT_ID.name()); - - columnsToKeep = column -> !metadataColumn.contains(column); - } else { - columnsToKeep = column -> !column.equals(MetadataColumns.CHANGE_TYPE.name()); - } + Set<String> metadataColumn = + netChanges + ? Sets.newHashSet( + MetadataColumns.CHANGE_TYPE.name(), + MetadataColumns.CHANGE_ORDINAL.name(), + MetadataColumns.COMMIT_SNAPSHOT_ID.name()) + : Sets.newHashSet(MetadataColumns.CHANGE_TYPE.name()); + + Predicate<StructField> columnsToDiscard = + field -> + metadataColumn.contains(field.name()) + // avoid sort on incomparable columns + || field.dataType() instanceof MapType Review Comment: Like mentioned in [PR desc](https://github.com/apache/spark/blob/master/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala#L25), should we skip all the non-orderable types ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java: ########## @@ -242,6 +249,34 @@ public void testUpdateWithIdentifierField() { sql("select * from %s order by _change_ordinal, id, data", viewName)); } + @TestTemplate + public void testUpdateWithInComparableType() { + createTableWithIncomparableType(); + + sql("INSERT INTO %s VALUES (2, Map('b','b'), 12)", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snap1 = table.currentSnapshot(); + + sql("INSERT OVERWRITE %s VALUES (3, Map('c','c'), 13), (2, Map('d','d'), 12)", tableName); Review Comment: Doesn't generating change logs rows depend on the ordering of the rows? If the ordering is removed on specific column, it can potentially return incorrect results? For example if the first column type was Map, we would sort on the remaining columns. And we might not be able to detect chnages reliably? ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java: ########## @@ -242,6 +249,34 @@ public void testUpdateWithIdentifierField() { sql("select * from %s order by _change_ordinal, id, data", viewName)); } + @TestTemplate + public void testUpdateWithInComparableType() { + createTableWithIncomparableType(); Review Comment: sql can be inlined? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java: ########## @@ -183,21 +185,26 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) { } private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) { - Predicate<String> columnsToKeep; - if (netChanges) { - Set<String> metadataColumn = - Sets.newHashSet( - MetadataColumns.CHANGE_TYPE.name(), - MetadataColumns.CHANGE_ORDINAL.name(), - MetadataColumns.COMMIT_SNAPSHOT_ID.name()); - - columnsToKeep = column -> !metadataColumn.contains(column); - } else { - columnsToKeep = column -> !column.equals(MetadataColumns.CHANGE_TYPE.name()); - } + Set<String> metadataColumn = + netChanges + ? Sets.newHashSet( + MetadataColumns.CHANGE_TYPE.name(), + MetadataColumns.CHANGE_ORDINAL.name(), + MetadataColumns.COMMIT_SNAPSHOT_ID.name()) + : Sets.newHashSet(MetadataColumns.CHANGE_TYPE.name()); + + Predicate<StructField> columnsToDiscard = + field -> + metadataColumn.contains(field.name()) + // avoid sort on incomparable columns + || field.dataType() instanceof MapType + || field.dataType() instanceof BinaryType; Column[] repartitionSpec = - Arrays.stream(df.columns()).filter(columnsToKeep).map(df::col).toArray(Column[]::new); + Arrays.stream(df.schema().fields()) + .filter(Predicate.not(columnsToDiscard)) Review Comment: Looks like we use it in the repartitioning spec as well, do we need to remove from the columns to be repartitioned as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org