pvary commented on code in PR #9179:
URL: https://github.com/apache/iceberg/pull/9179#discussion_r1410623023


##########
docs/flink-queries.md:
##########
@@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
+feature to prevent runaway readers, or providing triggers for [Flink 
windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Timestamp columns are automatically converted to milliseconds since the Java 
epoch of
+1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for 
long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+When using watermarks for Flink watermark alignment set 
`read.split.open-file-cost` to prevent
+combining multiple files to a single split.
+By default, the column metrics are collected for the first 100 columns of the 
table. Use [write properties](configuration.md#write-properties) starting with 
`write.metadata.metrics` when needed.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+// For windowing
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Watermark using timestamp column
+            .watermarkColumn("timestamp_column")
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            // Extract event timestamp from records
+            .withTimestampAssigner((record, eventTime) -> 
record.getTimestamp(pos, precision).getMillisecond()),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+
+// For watermark alignment
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource source = IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Disable combining multiple files to a single split 
+            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+            // Watermark using long column
+            .watermarkColumn("long_column")
+            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)

Review Comment:
   I would keep this 2 as a separate example.
   If I understand correctly @stevenzwu thinks that the watermark alignment is 
the most important feature of this change, and @mas-chen thinks that the 
ordering / windowing is more important.
   
   Probably this is a good indication that both benefits are important 😄 



##########
docs/flink-queries.md:
##########
@@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
+feature to prevent runaway readers, or providing triggers for [Flink 
windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Timestamp columns are automatically converted to milliseconds since the Java 
epoch of
+1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for 
long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+When using watermarks for Flink watermark alignment set 
`read.split.open-file-cost` to prevent
+combining multiple files to a single split.
+By default, the column metrics are collected for the first 100 columns of the 
table. Use [write properties](configuration.md#write-properties) starting with 
`write.metadata.metrics` when needed.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+// For windowing
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Watermark using timestamp column
+            .watermarkColumn("timestamp_column")
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            // Extract event timestamp from records
+            .withTimestampAssigner((record, eventTime) -> 
record.getTimestamp(pos, precision).getMillisecond()),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+
+// For watermark alignment
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource source = IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Disable combining multiple files to a single split 
+            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+            // Watermark using long column
+            .watermarkColumn("long_column")
+            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)

Review Comment:
   I would keep this 2 as a separate example.
   If I understand correctly @stevenzwu thinks that the watermark alignment is 
the most important feature of this change, and @mas-chen thinks that the 
ordering / windowing is more important.
   
   Probably this is a good indication that both benefits are important 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to