stevenzwu commented on code in PR #5967: URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040205639
########## docs/flink-getting-started.md: ########## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() + .tableLoader(TableLoader.fromCatalog(...)) + .assignerFactory(new SimpleSplitAssignerFactory()) + .streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) + .startSnapshotId(3821550127947089987L) + .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") + .build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() + .getConfiguration() + .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| --------------------------------------------- | --------------------------------------------- | ---------------------------- | ---------------------------------- | ------------------------------------------------------------ | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | +| case-sensitive | case-sensitive | N/A | false | If true, match column name in a case sensitive way. | +| as-of-timestamp | N/A | N/A | N/A | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | +| connector.iceberg.starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after th e timestamp. Just for FIP27 Source. | +| start-snapshot-timestamp | N/A | N/A | N/A | Start to read data from the most recent snapshot as of the given time in milliseconds. | +| start-snapshot-id | N/A | N/A | N/A | Start to read data from the specified snapshot-id. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| connector.iceberg.split-size | connector.iceberg.split-size | read.split.target-size | Table read.split.target-size | Target size when combining data input splits. | +| connector.iceberg.split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | Table read.split.planning-lookback | Number of bins to consider when combining input splits. | +| connector.iceberg.split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | Table read.split.open-file-cost | The estimated cost to open a file, used as a minimum weight when combining splits. | +| streaming | streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. | +| monitor-interval | monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. | +| connector.iceberg.include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. | +| connector.iceberg.max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | If there are multiple new snapshots, configure the maximum number of snapshot forward at a time. | Review Comment: the description is a little vague to me. what about `Max number of snapshots limited per split enumeration. Applicable only to streaming read.` ########## docs/flink-getting-started.md: ########## @@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream"); OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields. {{< /hint >}} -## Write options +## Options +### Read options + +Flink read options are passed when configuring the Flink IcebergSource, like this: + +``` +IcebergSource.forRowData() + .tableLoader(TableLoader.fromCatalog(...)) + .assignerFactory(new SimpleSplitAssignerFactory()) + .streaming(true) + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) + .startSnapshotId(3821550127947089987L) + .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") + .build() +``` +For Flink SQL, read options can be passed in via SQL hints like this: +``` +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ +... +``` + +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. + +``` +env.getConfig() + .getConfiguration() + .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L); +... +``` + +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. + +| Read option | Flink configuration | Table property | Default | Description | +| --------------------------------------------- | --------------------------------------------- | ---------------------------- | ---------------------------------- | ------------------------------------------------------------ | +| snapshot-id | N/A | N/A | N/A | For time travel in batch mode. Read data from the specified snapshot-id. | +| case-sensitive | case-sensitive | N/A | false | If true, match column name in a case sensitive way. | +| as-of-timestamp | N/A | N/A | N/A | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | +| connector.iceberg.starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after th e timestamp. Just for FIP27 Source. | +| start-snapshot-timestamp | N/A | N/A | N/A | Start to read data from the most recent snapshot as of the given time in milliseconds. | +| start-snapshot-id | N/A | N/A | N/A | Start to read data from the specified snapshot-id. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| connector.iceberg.split-size | connector.iceberg.split-size | read.split.target-size | Table read.split.target-size | Target size when combining data input splits. | +| connector.iceberg.split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | Table read.split.planning-lookback | Number of bins to consider when combining input splits. | +| connector.iceberg.split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | Table read.split.open-file-cost | The estimated cost to open a file, used as a minimum weight when combining splits. | +| streaming | streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. | +| monitor-interval | monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. | +| connector.iceberg.include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. | +| connector.iceberg.max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | If there are multiple new snapshots, configure the maximum number of snapshot forward at a time. | +| connector.iceberg.limit | connector.iceberg.limit | N/A | -1 | Limited output splits count. | Review Comment: I thought the limit is for number of rows for SQL like `select * from table limit 5` ########## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java: ########## @@ -219,6 +240,8 @@ public FlinkInputFormat buildFormat() { contextBuilder.planParallelism( readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)); + contextBuilder.settleConfig(table, readOptions, readableConfig); Review Comment: `resolveConfig` is probably a little better. ########## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -335,8 +365,30 @@ public Builder<T> exposeLocality(boolean newExposeLocality) { return this; } + /** + * Set the read properties for Flink source. View the supported properties in {@link + * FlinkReadOptions} + */ + public Builder<T> set(String property, String value) { Review Comment: personally, I would like to be avoid multiple ways of doing the same thing and confuse users which API should I use. In this case, I see you are also trying to keep it consistent as the sink where `set` method is provided. Hence, I am neutral on this. ########## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java: ########## @@ -261,6 +284,7 @@ public DataStream<RowData> build() { } public static boolean isBounded(Map<String, String> properties) { - return !ScanContext.builder().fromProperties(properties).build().isStreaming(); + return !Boolean.parseBoolean( Review Comment: can we use `PropertyUtil` here? ########## flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +import java.time.Duration; +import java.util.Map; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.TimeUtils; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; + +public class FlinkReadConf { + + private final FlinkConfParser confParser; + + public FlinkReadConf( + Table table, Map<String, String> readOptions, ReadableConfig readableConfig) { + this.confParser = new FlinkConfParser(table, readOptions, readableConfig); + } + + public Long snapshotId() { + return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional(); + } + + public boolean caseSensitive() { + return confParser + .booleanConf() + .option(FlinkReadOptions.CASE_SENSITIVE.key()) + .flinkConfig(FlinkReadOptions.CASE_SENSITIVE) Review Comment: as mentioned in the doc file, prefix shouldn't be added for hint options. it should be added here only for `flinkConfig`. ########## flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.util.List; +import org.apache.flink.types.Row; +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Test; + +public class TestFlinkSourceConfig extends TestFlinkTableSource { + private static final String TABLE = "test_table"; + + @Test + public void testFlinkSessionConfig() { + getTableEnv().getConfig().set("streaming", "true"); + AssertHelpers.assertThrows( + "Should throw exception because of cannot set snapshot-id option for streaming reader", + IllegalArgumentException.class, + "Cannot set as-of-timestamp option for streaming reader", + () -> { + sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE); + return null; + }); + + List<Row> result = Review Comment: nit: refactor the override hierarchy test out to a separate method. one method for one test scenario. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org