szehon-ho commented on code in PR #6350:
URL: https://github.com/apache/iceberg/pull/6350#discussion_r1043984156


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -150,7 +150,7 @@ public static Iterable<Snapshot> ancestorsOf(long 
snapshotId, Function<Long, Sna
 
   /**
    * Traverses the history of the table's current snapshot and finds the first 
snapshot committed
-   * after the given time.
+   * after the given time(inclusive).

Review Comment:
   Nit: space before (inclusive)



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java:
##########
@@ -137,6 +138,64 @@ public void testOverwrites() {
         changelogRecords(snap2, snap3));
   }
 
+  @Test
+  public void testQueryWithTimeRange() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected changed rows only from snapshot 3",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(snap2.timestampMillis() + 1, 
snap3.timestampMillis()));

Review Comment:
   Should we use the mechanism of  TestExpireSnapshot::rightAfterSnapshot ?  I 
think its a bit more readable. 
    Also, guarantees the snapshot timestamps are not the same (though its 
unlikely), which may lead to weird failures?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",

Review Comment:
   Too many negatives, I'm a bit confused.  Does this capture it?  "Cannot set 
both %s and %s"



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -308,6 +339,17 @@ public Scan buildChangelogScan() {
     return new SparkChangelogScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
   }
 
+  private Long getStartSnapshotId(Long startTimestamp) {
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, 
startTimestamp);
+    Preconditions.checkArgument(

Review Comment:
   To me, may be slightly more user friendly to return empty set.  But open.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java:
##########
@@ -32,6 +32,12 @@ private SparkReadOptions() {}
   // End snapshot ID used in incremental scans (inclusive)
   public static final String END_SNAPSHOT_ID = "end-snapshot-id";
 
+  // Start timestamp used in multi-snapshot scans (exclusive)
+  public static final String START_TIMESTAMP = "start-timestamp";

Review Comment:
   Looks like Flink has a config called 'start-snapshot-timestamp'.  Its a bit 
wordy but wondering if we need to synchronize the two (if they are related):  
cc @stevenzwu @pvary @hililiwei 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -285,6 +286,36 @@ public Scan buildChangelogScan() {
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
+    Long startTimestamp = readConf.startTimestamp();
+    Long endTimestamp = readConf.endTimestamp();
+
+    Preconditions.checkArgument(
+        !(startSnapshotId != null && startTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.START_SNAPSHOT_ID,
+        SparkReadOptions.START_TIMESTAMP);
+
+    Preconditions.checkArgument(
+        !(endSnapshotId != null && endTimestamp != null),
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.END_SNAPSHOT_ID,
+        SparkReadOptions.END_TIMESTAMP);
+
+    if (startTimestamp != null && endTimestamp != null) {
+      Preconditions.checkArgument(
+          startTimestamp < endTimestamp,
+          "Cannot set %s to be greater than %s for changelogs",
+          SparkReadOptions.START_TIMESTAMP,
+          SparkReadOptions.END_TIMESTAMP);
+    }
+
+    if (startTimestamp != null) {
+      startSnapshotId = getStartSnapshotId(startTimestamp);
+    }
+
+    if (endTimestamp != null) {
+      endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
+    }

Review Comment:
   Do we need to add corresponding checks in build() for not setting these new 
properties (like we check today for start-snapshot-id)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to