stevenzwu commented on code in PR #9346: URL: https://github.com/apache/iceberg/pull/9346#discussion_r1443084956
########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java: ########## @@ -109,4 +110,13 @@ private FlinkReadOptions() {} public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures"; public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION = ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3); + + public static final String WATERMARK_COLUMN = "watermark-column"; + public static final ConfigOption<String> WATERMARK_COLUMN_OPTION = + ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue(); + public static final String WATERMARK_TIME_COLUMN_UNIT = "watermark-column-time-unit"; Review Comment: nit: it seems the style is to have an empty line btw a config ########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -482,24 +478,25 @@ public IcebergSource<T> build() { } contextBuilder.resolveConfig(table, readOptions, flinkConfig); - Schema icebergSchema = table.schema(); if (projectedFlinkSchema != null) { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter(); + FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig); + String watermarkColumn = flinkReadConf.watermarkColumn(); + TimeUnit watermarkTimeUnit = flinkReadConf.watermarkTimeUnit(); + if (watermarkColumn != null) { // Column statistics is needed for watermark generation contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn)); - Review Comment: nit: revert unnecessary empty line change ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java: ########## @@ -29,11 +31,17 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.types.Types; import org.junit.Before; public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded { private volatile TableEnvironment tEnv; + private static final Schema SCHEMA_TS = Review Comment: all changes in this file should be reverted, right? ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -39,4 +62,76 @@ public void before() throws IOException { .getConfiguration() .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + + private Record generateRecord(Instant t1) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t1.getEpochSecond()); + return record; + } + + private List<Record> testWatermarkOptionsInternal(boolean ascending) throws Exception { Review Comment: this is not actually `test`. it is just preparing the table by appending records/files to the table. maybe also add a Javadoc to explain the return list is the expected order of results ########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -482,24 +478,25 @@ public IcebergSource<T> build() { } contextBuilder.resolveConfig(table, readOptions, flinkConfig); - Schema icebergSchema = table.schema(); if (projectedFlinkSchema != null) { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter(); + FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig); + String watermarkColumn = flinkReadConf.watermarkColumn(); + TimeUnit watermarkTimeUnit = flinkReadConf.watermarkTimeUnit(); + if (watermarkColumn != null) { // Column statistics is needed for watermark generation contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn)); - SplitWatermarkExtractor watermarkExtractor = new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit); emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor); splitAssignerFactory = new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor)); } - Review Comment: nit: unnecessary empty line change. Iceberg style has an empty line after a control block ends with `}` ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -126,7 +126,7 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r .collect(Collectors.toList()); } - public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) { + public static List<Row> buildRecordsExpected(List<Record> expectedRecords, Schema schema) { Review Comment: this method should be private. a better name could be `convertRecordToRow` ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -39,4 +62,76 @@ public void before() throws IOException { .getConfiguration() .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + + private Record generateRecord(Instant t1) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t1.getEpochSecond()); + return record; + } + + private List<Record> testWatermarkOptionsInternal(boolean ascending) throws Exception { + Table table = catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + long baseTime = 1702382109000L; + GenericAppenderHelper helper = + new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + // File 1 - early timestamps + Record early1 = generateRecord(Instant.ofEpochMilli(baseTime)); + Record early2 = generateRecord(Instant.ofEpochMilli(baseTime - 10 * 1000L)); + List<Record> recordsDataFile1 = Lists.newArrayList(); + recordsDataFile1.add(early1); + recordsDataFile1.add(early2); + DataFile dataFile1 = helper.writeFile(recordsDataFile1); + // File 2 - old timestamps + Record late1 = generateRecord(Instant.ofEpochMilli(baseTime + 14 * 1000L)); + Record late2 = generateRecord(Instant.ofEpochMilli(baseTime + 12 * 1000L)); + List<Record> recordsDataFile2 = Lists.newArrayList(); + recordsDataFile2.add(late1); + recordsDataFile2.add(late2); + DataFile dataFile2 = helper.writeFile(recordsDataFile2); + helper.appendToTable(dataFile1, dataFile2); + List<Record> expected = Lists.newArrayList(); + if (ascending) { + expected.addAll(recordsDataFile1); + expected.addAll(recordsDataFile2); + } else { + expected.addAll(recordsDataFile2); + expected.addAll(recordsDataFile1); + } + return expected; + } + + /** + * Tests the order of splits returned when setting the watermark-column and watermark-timeunit + * options + */ + @Test + public void testWatermarkOptionsAscending() throws Exception { + List<Record> expected = testWatermarkOptionsInternal(true); + TestHelpers.assertRecordsWithOrder( + run( + ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", "128000000"), + "", + "*"), + expected, + SCHEMA_TS); + } + + @Test + public void testWatermarkOptionsDescending() throws Exception { + List<Record> expected = testWatermarkOptionsInternal(false); Review Comment: I don't quite understand here. both test methods enable watermark alignment. one get the ascending order. another get the reverse order. ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -39,4 +62,76 @@ public void before() throws IOException { .getConfiguration() .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + + private Record generateRecord(Instant t1) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t1.getEpochSecond()); Review Comment: @pvary I didn't quite get the concern here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org