karuppayya commented on code in PR #11045:
URL: https://github.com/apache/iceberg/pull/11045#discussion_r1737628804


##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -242,6 +249,34 @@ public void testUpdateWithIdentifierField() {
         sql("select * from %s order by _change_ordinal, id, data", viewName));
   }
 
+  @TestTemplate
+  public void testUpdateWithInComparableType() {
+    createTableWithIncomparableType();
+
+    sql("INSERT INTO %s VALUES (2, Map('b','b'), 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, Map('c','c'), 13), (2, Map('d','d'), 
12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    assertThat(table.sortOrder()).isNotNull();
+
+    List<Object[]> returns =
+        sql("CALL %s.system.create_changelog_view(table => '%s')", 
catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, Collections.singletonMap("b", "b"), 12, INSERT, 0, 
snap1.snapshotId()),
+            row(2, Collections.singletonMap("b", "b"), 12, DELETE, 1, 
snap2.snapshotId()),
+            row(2, Collections.singletonMap("d", "d"), 12, INSERT, 1, 
snap2.snapshotId()),
+            row(3, Collections.singletonMap("c", "c"), 13, INSERT, 1, 
snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", viewName));

Review Comment:
   Should we remove this?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -183,21 +185,26 @@ private boolean shouldComputeUpdateImages(ProcedureInput 
input) {
   }
 
   private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean 
netChanges) {
-    Predicate<String> columnsToKeep;
-    if (netChanges) {
-      Set<String> metadataColumn =
-          Sets.newHashSet(
-              MetadataColumns.CHANGE_TYPE.name(),
-              MetadataColumns.CHANGE_ORDINAL.name(),
-              MetadataColumns.COMMIT_SNAPSHOT_ID.name());
-
-      columnsToKeep = column -> !metadataColumn.contains(column);
-    } else {
-      columnsToKeep = column -> 
!column.equals(MetadataColumns.CHANGE_TYPE.name());
-    }
+    Set<String> metadataColumn =
+        netChanges
+            ? Sets.newHashSet(
+                MetadataColumns.CHANGE_TYPE.name(),
+                MetadataColumns.CHANGE_ORDINAL.name(),
+                MetadataColumns.COMMIT_SNAPSHOT_ID.name())
+            : Sets.newHashSet(MetadataColumns.CHANGE_TYPE.name());
+
+    Predicate<StructField> columnsToDiscard =
+        field ->
+            metadataColumn.contains(field.name())
+                // avoid sort on incomparable columns
+                || field.dataType() instanceof MapType

Review Comment:
   Like mentioned in [PR 
desc](https://github.com/apache/spark/blob/master/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/OrderUtils.scala#L25),
 should we skip all the non-orderable types 
   



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -242,6 +249,34 @@ public void testUpdateWithIdentifierField() {
         sql("select * from %s order by _change_ordinal, id, data", viewName));
   }
 
+  @TestTemplate
+  public void testUpdateWithInComparableType() {
+    createTableWithIncomparableType();
+
+    sql("INSERT INTO %s VALUES (2, Map('b','b'), 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, Map('c','c'), 13), (2, Map('d','d'), 
12)", tableName);

Review Comment:
   Doesn't  generating change logs rows depend on the ordering of the rows?
   If the ordering is removed on specific column, it can potentially  return 
incorrect results?
   For example if the first column type was Map, we would sort on the remaining 
columns. And we might not be able to detect chnages reliably?
   
   



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -242,6 +249,34 @@ public void testUpdateWithIdentifierField() {
         sql("select * from %s order by _change_ordinal, id, data", viewName));
   }
 
+  @TestTemplate
+  public void testUpdateWithInComparableType() {
+    createTableWithIncomparableType();

Review Comment:
   sql can be inlined?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -183,21 +185,26 @@ private boolean shouldComputeUpdateImages(ProcedureInput 
input) {
   }
 
   private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean 
netChanges) {
-    Predicate<String> columnsToKeep;
-    if (netChanges) {
-      Set<String> metadataColumn =
-          Sets.newHashSet(
-              MetadataColumns.CHANGE_TYPE.name(),
-              MetadataColumns.CHANGE_ORDINAL.name(),
-              MetadataColumns.COMMIT_SNAPSHOT_ID.name());
-
-      columnsToKeep = column -> !metadataColumn.contains(column);
-    } else {
-      columnsToKeep = column -> 
!column.equals(MetadataColumns.CHANGE_TYPE.name());
-    }
+    Set<String> metadataColumn =
+        netChanges
+            ? Sets.newHashSet(
+                MetadataColumns.CHANGE_TYPE.name(),
+                MetadataColumns.CHANGE_ORDINAL.name(),
+                MetadataColumns.COMMIT_SNAPSHOT_ID.name())
+            : Sets.newHashSet(MetadataColumns.CHANGE_TYPE.name());
+
+    Predicate<StructField> columnsToDiscard =
+        field ->
+            metadataColumn.contains(field.name())
+                // avoid sort on incomparable columns
+                || field.dataType() instanceof MapType
+                || field.dataType() instanceof BinaryType;
 
     Column[] repartitionSpec =
-        
Arrays.stream(df.columns()).filter(columnsToKeep).map(df::col).toArray(Column[]::new);
+        Arrays.stream(df.schema().fields())
+            .filter(Predicate.not(columnsToDiscard))

Review Comment:
   Looks like we use it in the repartitioning spec as well, do we need to 
remove from the columns to be repartitioned as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to