stevenzwu commented on code in PR #9346: URL: https://github.com/apache/iceberg/pull/9346#discussion_r1445324248
########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java: ########## @@ -109,4 +110,14 @@ private FlinkReadOptions() {} public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures"; public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION = ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3); + + public static final String WATERMARK_COLUMN = "watermark-column"; + public static final ConfigOption<String> WATERMARK_COLUMN_OPTION = + ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue(); + + public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit"; Review Comment: the name doesn't match the doc `watermark-column-timeunit`. ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -18,25 +18,139 @@ */ package org.apache.iceberg.flink.source; +import static org.apache.iceberg.types.Types.NestedField.required; + import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Test; /** Use the IcebergSource (FLIP-27) */ public class TestIcebergSourceSql extends TestSqlBase { + private static final Schema SCHEMA_TS = + new Schema( + required(1, "t1", Types.TimestampType.withoutZone()), + required(2, "t2", Types.LongType.get())); + @Override public void before() throws IOException { - Configuration tableConf = getTableEnv().getConfig().getConfiguration(); + TableEnvironment tableEnvironment = getTableEnv(); + Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + + tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1"); SqlHelpers.sql( - getTableEnv(), + tableEnvironment, "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", catalogResource.warehouse()); - SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog"); - getTableEnv() + SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog"); + + tableEnvironment .getConfig() .getConfiguration() .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + + private Record generateRecord(Instant t1, long t2) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t2); + return record; + } + + /** Generates the records in the expected order, with respect to their datafile */ + private List<Record> generateExpectedRecords(boolean ascending) throws Exception { + Table table = catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + long baseTime = 1702382109000L; + + GenericAppenderHelper helper = + new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + + // File 1 - early timestamps, new longs + Record early1 = Review Comment: the naming of `early` vs `late` is inaccurate, as the two columns have different time order. ########## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java: ########## @@ -18,25 +18,139 @@ */ package org.apache.iceberg.flink.source; +import static org.apache.iceberg.types.Types.NestedField.required; + import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Test; /** Use the IcebergSource (FLIP-27) */ public class TestIcebergSourceSql extends TestSqlBase { + private static final Schema SCHEMA_TS = + new Schema( + required(1, "t1", Types.TimestampType.withoutZone()), + required(2, "t2", Types.LongType.get())); + @Override public void before() throws IOException { - Configuration tableConf = getTableEnv().getConfig().getConfiguration(); + TableEnvironment tableEnvironment = getTableEnv(); + Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + + tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1"); SqlHelpers.sql( - getTableEnv(), + tableEnvironment, "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", catalogResource.warehouse()); - SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog"); - getTableEnv() + SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog"); + + tableEnvironment Review Comment: this can be changed to using `tableConf` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org