flyrain commented on code in PR #10252: URL: https://github.com/apache/iceberg/pull/10252#discussion_r1615317449
########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java: ########## @@ -291,6 +291,59 @@ public void testMetadataColumns() { rows); } + @TestTemplate + public void testQueryWithRollback() { + createTable(); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snap1 = table.currentSnapshot(); + long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis()); + + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + table.refresh(); + Snapshot snap2 = table.currentSnapshot(); + long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis()); + + sql( + "CALL %s.system.rollback_to_snapshot('%s', %d)", + catalogName, tableIdent, snap1.snapshotId()); + table.refresh(); + assertThat(table.currentSnapshot()).isEqualTo(snap1); + + sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName); + table.refresh(); + Snapshot snap3 = table.currentSnapshot(); + long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis()); + + assertEquals( + "Should have expected changed rows up to snapshot 3", + ImmutableList.of( + row(1, "a", "INSERT", 0, snap1.snapshotId()), + row(1, "a", "DELETE", 1, snap3.snapshotId()), + row(-2, "a", "INSERT", 1, snap3.snapshotId())), + changelogRecords(null, rightAfterSnap3)); + + assertEquals( + "Should have expected changed rows up to snapshot 2", + ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())), + changelogRecords(null, rightAfterSnap2)); + + assertEquals( + "Should have expected changed rows from snapshot 2 and 3", Review Comment: It should not include change rows from snapshot 2. The result is correct, but the message is bit misleading. How about something like this? ``` Should have expected changed rows from snapshot 3 only since snapshot 2 is in a different branch. ``` ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java: ########## @@ -291,6 +291,59 @@ public void testMetadataColumns() { rows); } + @TestTemplate + public void testQueryWithRollback() { + createTable(); + + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snap1 = table.currentSnapshot(); + long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis()); + + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + table.refresh(); + Snapshot snap2 = table.currentSnapshot(); + long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis()); + + sql( + "CALL %s.system.rollback_to_snapshot('%s', %d)", + catalogName, tableIdent, snap1.snapshotId()); + table.refresh(); + assertThat(table.currentSnapshot()).isEqualTo(snap1); + + sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName); + table.refresh(); + Snapshot snap3 = table.currentSnapshot(); + long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis()); + + assertEquals( + "Should have expected changed rows up to snapshot 3", + ImmutableList.of( + row(1, "a", "INSERT", 0, snap1.snapshotId()), + row(1, "a", "DELETE", 1, snap3.snapshotId()), + row(-2, "a", "INSERT", 1, snap3.snapshotId())), + changelogRecords(null, rightAfterSnap3)); + + assertEquals( + "Should have expected changed rows up to snapshot 2", + ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())), + changelogRecords(null, rightAfterSnap2)); + + assertEquals( + "Should have expected changed rows from snapshot 2 and 3", + ImmutableList.of( + row(1, "a", "DELETE", 0, snap3.snapshotId()), + row(-2, "a", "INSERT", 0, snap3.snapshotId())), + changelogRecords(rightAfterSnap1, snap3.timestampMillis())); + + assertEquals( + "Should have expected changed rows from snapshot 3", + ImmutableList.of( + row(1, "a", "DELETE", 0, snap3.snapshotId()), + row(-2, "a", "INSERT", 0, snap3.snapshotId())), + changelogRecords(rightAfterSnap2, null)); Review Comment: Can we add more cases by rollbacking to snapshot 2? We want to test it doesn't pick up the latest snapshot 3 when it is not in the main branch. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java: ########## @@ -589,6 +589,24 @@ private Long getStartSnapshotId(Long startTimestamp) { } } + private Long getEndSnapshotId(Long endTimestamp) { + Long snapshotId = null; + Snapshot current = table.currentSnapshot(); + if (current != null) { + if (current.timestampMillis() <= endTimestamp) { + snapshotId = current.snapshotId(); + } else { + for (Snapshot ancestor : SnapshotUtil.currentAncestors(table)) { + if (ancestor.timestampMillis() <= endTimestamp) { + snapshotId = ancestor.snapshotId(); + break; + } + } + } + } Review Comment: `SnapshotUtil.currentAncestors(table)` includes the current snapshot as well. We could simplify the logic a bit. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java: ########## @@ -589,6 +589,24 @@ private Long getStartSnapshotId(Long startTimestamp) { } } + private Long getEndSnapshotId(Long endTimestamp) { + Long snapshotId = null; + Snapshot current = table.currentSnapshot(); + if (current != null) { + if (current.timestampMillis() <= endTimestamp) { + snapshotId = current.snapshotId(); + } else { + for (Snapshot ancestor : SnapshotUtil.currentAncestors(table)) { + if (ancestor.timestampMillis() <= endTimestamp) { + snapshotId = ancestor.snapshotId(); + break; + } + } + } + } Review Comment: Also I think we can move this method to the class `SnapshotUtil`. It could be useful for other scans as well. For example, I'm not sure if time travel query like this has the similar bug. We can double check on that, but it's a blocker for this PR. ``` SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00'; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org