hililiwei opened a new pull request, #7638: URL: https://github.com/apache/iceberg/pull/7638
Co-authored-by: quanyingxue <[email protected]> ### Partition Commit Iceberg flink writer’s default data commit relies on `Checkpoint`. When`Checkpoint` is completed, the new data written is committed to the metadata, regardless of which partition the new data belongs to. However, after writing a partition, it is often necessary to notify downstream applications. For example, add the partition to metadata or writing a `_SUCCESS` file in the directory(Even if stored in object storage, downstream applications may still need to rely on this file as a flag to drive the progress of the entire process). For the current default partition commit mode, which depends on `Checkpoint`, we can understand it as using process time to determine table partition commit (this commit mode still has room for optimization, because developers may want to decouple it from the checkpoint cycle). And the partition commit strategy in this PR, we can understand it as using event time to decide whether to commit table partitions. - Policy: How to commit a partition, built-in policies support for the commit of success files and default, you can also implement your own policies, such as merging small files, etc. **NOTE:** Partition Commit only works in dynamic partition inserting. #### Partition commit trigger To define when to commit a partition, providing partition commit trigger: | Option | Required | Default | Type | Description | | :---------------------------------------- | :------- | :------ | :------- | :----------------------------------------------------------- | | sink.partition-commit.enabled | optional | false | Boolean | Set true to commit partition according to the time that extracted from partition values and watermark. | | sink.partition-commit.delay | optional | 0 s | Duration | The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'. | | sink.partition-commit.watermark-time-zone | optional | UTC | String | The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.enabled` is set to 'true'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'. | Partition commit according to the time that extracted from partition values and watermark. This requires that your job has watermark generation, and the partition is divided according to time, such as hourly partition or daily partition. If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values: - ‘sink.partition-commit.enabled’=‘true’ - ‘sink.partition-commit.delay’=‘1h’ (‘1h’ if your partition is hourly partition, depends on your partition type) This is the most accurate way to commit partition, and it will try to ensure that the committed partitions are as data complete as possible. Late data processing: The record will be written into its partition when a record is supposed to be written into a partition that has already been committed, and then the committing of this partition will be triggered again. #### Partition Time Extractor Time extractors define extracting time from partition values. | Option | Required | Default | Type | Description | | :------------------------------------------- | :------- | :------------------ | :----- | :----------------------------------------------------------- | | partition.time-extractor.timestamp-pattern | optional | (none) | String | Use partition fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can configure: '$dt $hour:00:00'. | | partition.time-extractor.timestamp-formatter | optional | yyyy-MM-dd HH:mm:ss | String | The formatter that formats the partition timestamp string value to timestamp, the partition timestamp string value is expressed by 'partition.time-extractor.timestamp-pattern'. For example, the partition timestamp is extracted from multiple partition fields, say 'year', 'month' and 'day', you can configure 'partition.time-extractor.timestamp-pattern' to '$year$month$day', and configure `partition.time-extractor.timestamp-formatter` to 'yyyyMMdd'. By default the formatter is 'yyyy-MM-dd HH:mm:ss'. The timestamp-formatter is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) | The default extractor is based on a timestamp pattern composed of your partition fields. #### Partition Commit Policy The partition commit policy defines what action is taken when partitions are committed. | Option | Required | Default | Type | Description | | :-------------------------------------- | :------- | :------- | :----- | :----------------------------------------------------------- | | sink.partition-commit.policy.kind | optional | default | String | Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. default: add partition to meta data. success-file: add '_SUCESS' file to directory. custom: use policy class to create a commit policy. Support to configure multiple policies: 'default,success-file'. | | sink.partition-commit.policy.class | optional | (none) | String | The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy. | | sink.partition-commit.success-file.name | optional | _SUCCESS | String | The file name for success-file partition commit policy, default is '_SUCCESS'. | ### Full-example ```sql CREATE TABLE kafka_table ( id STRING, name STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH (...); CREATE TABLE iceberg_table ( id STRING, name STRING, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'format-version'='2', 'sink.partition-commit.enabled'='true', 'sink.partition-commit.delay'='1h', 'sink.partition-commit.policy.kind'='success-file', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'partition.time-extractor.timestamp-format'='yyyy-MM-dd HH:mm:ss' ); INSERT INTO iceberg_table SELECT id, name, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') FROM kafka_table; -- batch sql, select with partition pruning SELECT * FROM iceberg_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ; ``` ### To do - When checkpoint is triggered, store the unclosed writers to the state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
