stevenzwu commented on code in PR #9179: URL: https://github.com/apache/iceberg/pull/9179#discussion_r1409630306
########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). Review Comment: I don't think we should call out `windows` specifically for the benefit of emitting watermark from source itself. any event time and watermark strategy will have windows triggers ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. + +The watermarks are generated based on column metrics stored for data files and emitted once per split. +When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent +combining multiple files to a single split. +By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. Review Comment: break off this line to a new paragraph. ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. Review Comment: > Use `watermarkTimeUnit` to configure the conversion for long columns. Before this sentence, maybe elaborate a little more. ``` Iceberg `timestamp` or `timestamptz` inherently contains the time precision. So there is no need to specify the time unit. But `long` type column doesn't contain time unit information. Use `watermarkTimeUnit` to configure the conversion for long columns. ``` ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. + +The watermarks are generated based on column metrics stored for data files and emitted once per split. +When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent +combining multiple files to a single split. +By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); + +// For windowing +DataStream<RowData> stream = + env.fromSource( + IcebergSource.forRowData() + .tableLoader(tableLoader) + // Watermark using timestamp column + .watermarkColumn("timestamp_column") + .build(), + // Watermarks are generated by the source, no need to generate it manually + WatermarkStrategy.<RowData>noWatermarks() + // Extract event timestamp from records + .withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + +// For watermark alignment +DataStream<RowData> stream = + env.fromSource( + IcebergSource source = IcebergSource.forRowData() + .tableLoader(tableLoader) + // Disable combining multiple files to a single split + .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT)) + // Watermark using long column + .watermarkColumn("long_column") + .watermarkTimeUnit(TimeUnit.MILLI_SCALE) + .build(), + // Watermarks are generated by the source, no need to generate it manually + WatermarkStrategy.<RowData>noWatermarks() + .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift), + SOURCE_NAME, + TypeInformation.of(RowData.class)); +``` + ## Options Review Comment: we also need to update the read options section. https://iceberg.apache.org/docs/1.3.0/flink-configuration/#read-options ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. + +The watermarks are generated based on column metrics stored for data files and emitted once per split. +When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent +combining multiple files to a single split. Review Comment: expand a little bit after this. ``` The watermarks are generated based on column metrics stored for data files and emitted once per split. If multiple smaller files with different time ranges are combined into a single split, it can increase the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to set `read.split.open-file-cost` to a very large value to prevent combining multiple smaller files into a single split. ``` ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. + +The watermarks are generated based on column metrics stored for data files and emitted once per split. +When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent +combining multiple files to a single split. +By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); + +// For windowing Review Comment: I would call this `for timestamp column` ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). Review Comment: I would also remove `prevent runaway readers`, as it may not be very clear to users what "runaway readers" mean ########## docs/flink-queries.md: ########## @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks() "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); ``` +### Emitting watermarks +Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) +feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). + +Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. +The supported column types are `timestamp`, `timestamptz` and `long`. +Timestamp columns are automatically converted to milliseconds since the Java epoch of +1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. + +The watermarks are generated based on column metrics stored for data files and emitted once per split. +When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent +combining multiple files to a single split. +By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); + +// For windowing +DataStream<RowData> stream = + env.fromSource( + IcebergSource.forRowData() + .tableLoader(tableLoader) + // Watermark using timestamp column + .watermarkColumn("timestamp_column") + .build(), + // Watermarks are generated by the source, no need to generate it manually + WatermarkStrategy.<RowData>noWatermarks() + // Extract event timestamp from records + .withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()), + SOURCE_NAME, + TypeInformation.of(RowData.class)); + +// For watermark alignment Review Comment: I would call this for `long time column` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org