stevenzwu commented on code in PR #5967:
URL: https://github.com/apache/iceberg/pull/5967#discussion_r1040205639


##########
docs/flink-getting-started.md:
##########
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+    .tableLoader(TableLoader.fromCatalog(...))
+    .assignerFactory(new SimpleSplitAssignerFactory())
+    .streaming(true)
+    
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+    .startSnapshotId(3821550127947089987L)
+    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+    .build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+    .getConfiguration()
+    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option                                   | Flink configuration          
                 | Table property               | Default                       
     | Description                                                  |
+| --------------------------------------------- | 
--------------------------------------------- | ---------------------------- | 
---------------------------------- | 
------------------------------------------------------------ |
+| snapshot-id                                   | N/A                          
                 | N/A                          | N/A                           
     | For time travel in batch mode. Read data from the specified snapshot-id. 
|
+| case-sensitive                                | case-sensitive               
                 | N/A                          | false                         
     | If true, match column name in a case sensitive way.          |
+| as-of-timestamp                               | N/A                          
                 | N/A                          | N/A                           
     | For time travel in batch mode. Read data from the most recent snapshot 
as of the given time in milliseconds. |
+| connector.iceberg.starting-strategy           | 
connector.iceberg.starting-strategy           | N/A                          | 
INCREMENTAL_FROM_LATEST_SNAPSHOT   | Starting strategy for streaming execution. 
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the 
incremental mode. The incremental mode starts from the current snapshot 
exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive. If it is an empty map, all future append snapshots 
should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental 
mode from the earliest snapshot inclusive. If it is an empty map, all future 
append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start 
incremental mode from a snapshot with a specific id inclusive. 
INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot 
with a specific timestamp inclusive. If the timestamp is between two snapshots, 
it should start from the snapshot after th
 e timestamp. Just for FIP27 Source. |
+| start-snapshot-timestamp                      | N/A                          
                 | N/A                          | N/A                           
     | Start to read data from the most recent snapshot as of the given time in 
milliseconds. |
+| start-snapshot-id                             | N/A                          
                 | N/A                          | N/A                           
     | Start to read data from the specified snapshot-id.           |
+| end-snapshot-id                               | N/A                          
                 | N/A                          | The latest snapshot id        
     | Specifies the end snapshot.                                  |
+| connector.iceberg.split-size                  | connector.iceberg.split-size 
                 | read.split.target-size       | Table read.split.target-size  
     | Target size when combining data input splits.                |
+| connector.iceberg.split-lookback              | 
connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 
Table read.split.planning-lookback | Number of bins to consider when combining 
input splits.      |
+| connector.iceberg.split-file-open-cost        | 
connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 
Table read.split.open-file-cost    | The estimated cost to open a file, used as 
a minimum weight when combining splits. |
+| streaming                                     | streaming                    
                 | N/A                          | false                         
     | Sets whether the current task runs in streaming or batch mode. |
+| monitor-interval                              | monitor-interval             
                 | N/A                          | 60s                           
     | Monitor interval to discover splits from new snapshots. Applicable only 
for streaming read. |
+| connector.iceberg.include-column-stats        | 
connector.iceberg.include-column-stats        | N/A                          | 
false                              | Create a new scan from this that loads the 
column stats with each data file. Column stats include: value count, null value 
count, lower bounds, and upper bounds. |
+| connector.iceberg.max-planning-snapshot-count | 
connector.iceberg.max-planning-snapshot-count | N/A                          | 
Integer.MAX_VALUE                  | If there are multiple new snapshots, 
configure the maximum number of snapshot forward at a time. |

Review Comment:
   the description is a little vague to me. what about `Max number of snapshots 
limited per split enumeration. Applicable only to streaming read.`



##########
docs/flink-getting-started.md:
##########
@@ -683,7 +683,58 @@ env.execute("Test Iceberg DataStream");
 OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
 {{< /hint >}}
 
-## Write options
+## Options
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+
+```
+IcebergSource.forRowData()
+    .tableLoader(TableLoader.fromCatalog(...))
+    .assignerFactory(new SimpleSplitAssignerFactory())
+    .streaming(true)
+    
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+    .startSnapshotId(3821550127947089987L)
+    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", 
"10s")
+    .build()
+```
+For Flink SQL, read options can be passed in via SQL hints like this:
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to 
current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+    .getConfiguration()
+    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and 
then `Table property`.
+
+| Read option                                   | Flink configuration          
                 | Table property               | Default                       
     | Description                                                  |
+| --------------------------------------------- | 
--------------------------------------------- | ---------------------------- | 
---------------------------------- | 
------------------------------------------------------------ |
+| snapshot-id                                   | N/A                          
                 | N/A                          | N/A                           
     | For time travel in batch mode. Read data from the specified snapshot-id. 
|
+| case-sensitive                                | case-sensitive               
                 | N/A                          | false                         
     | If true, match column name in a case sensitive way.          |
+| as-of-timestamp                               | N/A                          
                 | N/A                          | N/A                           
     | For time travel in batch mode. Read data from the most recent snapshot 
as of the given time in milliseconds. |
+| connector.iceberg.starting-strategy           | 
connector.iceberg.starting-strategy           | N/A                          | 
INCREMENTAL_FROM_LATEST_SNAPSHOT   | Starting strategy for streaming execution. 
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the 
incremental mode. The incremental mode starts from the current snapshot 
exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive. If it is an empty map, all future append snapshots 
should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental 
mode from the earliest snapshot inclusive. If it is an empty map, all future 
append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start 
incremental mode from a snapshot with a specific id inclusive. 
INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot 
with a specific timestamp inclusive. If the timestamp is between two snapshots, 
it should start from the snapshot after th
 e timestamp. Just for FIP27 Source. |
+| start-snapshot-timestamp                      | N/A                          
                 | N/A                          | N/A                           
     | Start to read data from the most recent snapshot as of the given time in 
milliseconds. |
+| start-snapshot-id                             | N/A                          
                 | N/A                          | N/A                           
     | Start to read data from the specified snapshot-id.           |
+| end-snapshot-id                               | N/A                          
                 | N/A                          | The latest snapshot id        
     | Specifies the end snapshot.                                  |
+| connector.iceberg.split-size                  | connector.iceberg.split-size 
                 | read.split.target-size       | Table read.split.target-size  
     | Target size when combining data input splits.                |
+| connector.iceberg.split-lookback              | 
connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 
Table read.split.planning-lookback | Number of bins to consider when combining 
input splits.      |
+| connector.iceberg.split-file-open-cost        | 
connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 
Table read.split.open-file-cost    | The estimated cost to open a file, used as 
a minimum weight when combining splits. |
+| streaming                                     | streaming                    
                 | N/A                          | false                         
     | Sets whether the current task runs in streaming or batch mode. |
+| monitor-interval                              | monitor-interval             
                 | N/A                          | 60s                           
     | Monitor interval to discover splits from new snapshots. Applicable only 
for streaming read. |
+| connector.iceberg.include-column-stats        | 
connector.iceberg.include-column-stats        | N/A                          | 
false                              | Create a new scan from this that loads the 
column stats with each data file. Column stats include: value count, null value 
count, lower bounds, and upper bounds. |
+| connector.iceberg.max-planning-snapshot-count | 
connector.iceberg.max-planning-snapshot-count | N/A                          | 
Integer.MAX_VALUE                  | If there are multiple new snapshots, 
configure the maximum number of snapshot forward at a time. |
+| connector.iceberg.limit                       | connector.iceberg.limit      
                 | N/A                          | -1                            
     | Limited output splits count.                                 |

Review Comment:
   I thought the limit is for number of rows for SQL like `select * from table 
limit 5`



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -219,6 +240,8 @@ public FlinkInputFormat buildFormat() {
       contextBuilder.planParallelism(
           
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
 
+      contextBuilder.settleConfig(table, readOptions, readableConfig);

Review Comment:
   `resolveConfig` is probably a little better.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -335,8 +365,30 @@ public Builder<T> exposeLocality(boolean 
newExposeLocality) {
       return this;
     }
 
+    /**
+     * Set the read properties for Flink source. View the supported properties 
in {@link
+     * FlinkReadOptions}
+     */
+    public Builder<T> set(String property, String value) {

Review Comment:
   personally, I would like to be avoid multiple ways of doing the same thing 
and confuse users which API should I use. In this case, I see you are also 
trying to keep it consistent as the sink where `set` method is provided. Hence, 
I am neutral on this.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -261,6 +284,7 @@ public DataStream<RowData> build() {
   }
 
   public static boolean isBounded(Map<String, String> properties) {
-    return 
!ScanContext.builder().fromProperties(properties).build().isStreaming();
+    return !Boolean.parseBoolean(

Review Comment:
   can we use `PropertyUtil` here?



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.TimeUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+
+public class FlinkReadConf {
+
+  private final FlinkConfParser confParser;
+
+  public FlinkReadConf(
+      Table table, Map<String, String> readOptions, ReadableConfig 
readableConfig) {
+    this.confParser = new FlinkConfParser(table, readOptions, readableConfig);
+  }
+
+  public Long snapshotId() {
+    return 
confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
+  }
+
+  public boolean caseSensitive() {
+    return confParser
+        .booleanConf()
+        .option(FlinkReadOptions.CASE_SENSITIVE.key())
+        .flinkConfig(FlinkReadOptions.CASE_SENSITIVE)

Review Comment:
   as mentioned in the doc file, prefix shouldn't be added for hint options. it 
should be added here only for `flinkConfig`.



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.AssertHelpers;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkSourceConfig extends TestFlinkTableSource {
+  private static final String TABLE = "test_table";
+
+  @Test
+  public void testFlinkSessionConfig() {
+    getTableEnv().getConfig().set("streaming", "true");
+    AssertHelpers.assertThrows(
+        "Should throw exception because of cannot set snapshot-id option for 
streaming reader",
+        IllegalArgumentException.class,
+        "Cannot set as-of-timestamp option for streaming reader",
+        () -> {
+          sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE);
+          return null;
+        });
+
+    List<Row> result =

Review Comment:
   nit: refactor the override hierarchy test out to a separate method. one 
method for one test scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to