rodmeneses commented on code in PR #9346:
URL: https://github.com/apache/iceberg/pull/9346#discussion_r1445280024


##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java:
##########
@@ -39,4 +62,76 @@ public void before() throws IOException {
         .getConfiguration()
         .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
   }
+
+  private Record generateRecord(Instant t1) {
+    Record record = GenericRecord.create(SCHEMA_TS);
+    record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
+    record.setField("t2", t1.getEpochSecond());
+    return record;
+  }
+
+  private List<Record> testWatermarkOptionsInternal(boolean ascending) throws 
Exception {
+    Table table = 
catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);
+    long baseTime = 1702382109000L;
+    GenericAppenderHelper helper =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER);
+    // File 1 - early timestamps
+    Record early1 = generateRecord(Instant.ofEpochMilli(baseTime));
+    Record early2 = generateRecord(Instant.ofEpochMilli(baseTime - 10 * 
1000L));
+    List<Record> recordsDataFile1 = Lists.newArrayList();
+    recordsDataFile1.add(early1);
+    recordsDataFile1.add(early2);
+    DataFile dataFile1 = helper.writeFile(recordsDataFile1);
+    // File 2 - old timestamps
+    Record late1 = generateRecord(Instant.ofEpochMilli(baseTime + 14 * 1000L));
+    Record late2 = generateRecord(Instant.ofEpochMilli(baseTime + 12 * 1000L));
+    List<Record> recordsDataFile2 = Lists.newArrayList();
+    recordsDataFile2.add(late1);
+    recordsDataFile2.add(late2);
+    DataFile dataFile2 = helper.writeFile(recordsDataFile2);
+    helper.appendToTable(dataFile1, dataFile2);
+    List<Record> expected = Lists.newArrayList();
+    if (ascending) {
+      expected.addAll(recordsDataFile1);
+      expected.addAll(recordsDataFile2);
+    } else {
+      expected.addAll(recordsDataFile2);
+      expected.addAll(recordsDataFile1);
+    }
+    return expected;
+  }
+
+  /**
+   * Tests the order of splits returned when setting the watermark-column and 
watermark-timeunit
+   * options
+   */
+  @Test
+  public void testWatermarkOptionsAscending() throws Exception {
+    List<Record> expected = testWatermarkOptionsInternal(true);
+    TestHelpers.assertRecordsWithOrder(
+        run(
+            ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", 
"128000000"),
+            "",
+            "*"),
+        expected,
+        SCHEMA_TS);
+  }
+
+  @Test
+  public void testWatermarkOptionsDescending() throws Exception {
+    List<Record> expected = testWatermarkOptionsInternal(false);

Review Comment:
   the `generateExpectedRecords` (which is now renamed, before it used to be 
`testWatermarkOptionsInternal`) method will generate records in either 
ascending or descending order, with respect to their data file. For each 
direction, we have a different unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to