pvary commented on code in PR #9346: URL: https://github.com/apache/iceberg/pull/9346#discussion_r1433119614
########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java: ########## @@ -77,4 +100,92 @@ protected List<Row> run( String optionStr = SqlHelpers.sqlOptionsToString(options); return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, optionStr, sqlFilter); } + + protected Record generateRecord(Instant t1, long t2) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t2); + return record; + } + + @Test + public void testWatermarkOptions() throws Exception { + // Skip AVRO since we don't collect metrics for it, and hence we cannot use watermark column as + // there are no stats + // re: https://github.com/apache/iceberg/pull/1963 + Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat); + + Table table = catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + long baseTime = 1702382109000L; + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + // File 1 - early timestamps, new longs + Record early1 = + generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 60 * 24 * 30L)); + Record early2 = + generateRecord( + Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 * 60 * 24 * 35L)); + + List<Record> recordsDataFile1 = Lists.newArrayList(); + recordsDataFile1.add(early1); + recordsDataFile1.add(early2); + DataFile dataFile1 = helper.writeFile(recordsDataFile1); + // File 2 - old timestamps, old longs + Record late1 = + generateRecord( + Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 * 60 * 24 * 30L)); + Record late2 = + generateRecord( + Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 * 61 * 24 * 35L)); + + List<Record> recordsDataFile2 = Lists.newArrayList(); + recordsDataFile2.add(late1); + recordsDataFile2.add(late2); + DataFile dataFile2 = helper.writeFile(recordsDataFile2); + + helper.appendToTable(dataFile1, dataFile2); + + // first assertion: early and then late + List<Record> expected = Lists.newArrayList(); + expected.addAll(recordsDataFile1); + expected.addAll(recordsDataFile2); + TestHelpers.assertRecordsWithOrder( + run( + SCHEMA_TS, + ImmutableList.of(), + ImmutableMap.of( + "watermark-column", + "t1", + "split-file-open-cost", + "128000000", + "write-parallelism", + "1"), Review Comment: Does this also effect the read parallelism too? Could you please check the effective parallelism in the test? I would expect we need to use https://nightlies.apache.org/flink/flink-docs-release-1.9/dev/table/config.html#table-exec-resource-default-parallelism to change the parallelism of the reads -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org