hililiwei opened a new pull request, #7638:
URL: https://github.com/apache/iceberg/pull/7638

   Co-authored-by: quanyingxue <[email protected]>
   
   ### Partition Commit 
   
   Iceberg flink writer’s default data commit relies on `Checkpoint`. 
When`Checkpoint` is completed, the new data written is committed to the 
metadata, regardless of which partition the new data belongs to. However, after 
writing a partition, it is often necessary to notify downstream applications. 
For example, add the partition to metadata or writing a `_SUCCESS` file in the 
directory(Even if stored in object storage, downstream applications may still 
need to rely on this file as a flag to drive the progress of the entire 
process). 
   
   For the current default partition commit mode, which depends on 
`Checkpoint`, we can understand it as using process time to determine table 
partition commit (this commit mode still has room for optimization, because 
developers may want to decouple it from the checkpoint cycle). And the 
partition commit strategy in this PR, we can understand it as using event time 
to decide whether to commit table partitions.
   
   - Policy: How to commit a partition, built-in policies support for the 
commit of success files and default, you can also implement your own policies, 
such as merging small files, etc.
   
   **NOTE:** Partition Commit only works in dynamic partition inserting.
   
   #### Partition commit trigger 
   
   To define when to commit a partition, providing partition commit trigger:
   
   | Option                                    | Required | Default | Type     
| Description                                                  |
   | :---------------------------------------- | :------- | :------ | :------- 
| :----------------------------------------------------------- |
   | sink.partition-commit.enabled             | optional | false   | Boolean  
| Set true to commit partition according to the time that extracted from 
partition values and watermark. |
   | sink.partition-commit.delay               | optional | 0 s     | Duration 
| The partition will not commit until the delay time. If it is a daily 
partition, should be '1 d', if it is a hourly partition, should be '1 h'. |
   | sink.partition-commit.watermark-time-zone | optional | UTC     | String   
| The time zone to parse the long watermark value to TIMESTAMP value, the 
parsed watermark timestamp is used to compare with partition time to decide the 
partition should commit or not. This option is only take effect when 
`sink.partition-commit.enabled` is set to 'true'. If this option is not 
configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, 
but this config is not configured, then users may see the partition committed 
after a few hours. The default value is 'UTC', which means the watermark is 
defined on TIMESTAMP column or not defined. If the watermark is defined on 
TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The 
option value is either a full name such as 'America/Los_Angeles', or a custom 
timezone id such as 'GMT-08:00'. |
   
   Partition commit according to the time that extracted from partition values 
and watermark. This requires that your job has watermark generation, and the 
partition is divided according to time, such as hourly partition or daily 
partition.
   
   If you want to let downstream see the partition only when its data is 
complete, and your job has watermark generation, and you can extract the time 
from partition values:
   
   - ‘sink.partition-commit.enabled’=‘true’
   - ‘sink.partition-commit.delay’=‘1h’ (‘1h’ if your partition is hourly 
partition, depends on your partition type) This is the most accurate way to 
commit partition, and it will try to ensure that the committed partitions are 
as data complete as possible.
   
   Late data processing: The record will be written into its partition when a 
record is supposed to be written into a partition that has already been 
committed, and then the committing of this partition will be triggered again.
   
   #### Partition Time Extractor 
   
   Time extractors define extracting time from partition values.
   
   | Option                                       | Required | Default          
   | Type   | Description                                                  |
   | :------------------------------------------- | :------- | 
:------------------ | :----- | 
:----------------------------------------------------------- |
   | partition.time-extractor.timestamp-pattern   | optional | (none)           
   | String | Use partition fields to get a legal timestamp pattern. Default 
support 'yyyy-MM-dd hh:mm:ss' from first field. If timestamp should be 
extracted from a single partition field 'dt', can configure: '$dt'. If 
timestamp should be extracted from multiple partition fields, say 'year', 
'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If 
timestamp should be extracted from two partition fields 'dt' and 'hour', can 
configure: '$dt $hour:00:00'. |
   | partition.time-extractor.timestamp-formatter | optional | yyyy-MM-dd 
HH:mm:ss | String | The formatter that formats the partition timestamp string 
value to timestamp, the partition timestamp string value is expressed by 
'partition.time-extractor.timestamp-pattern'. For example, the partition 
timestamp is extracted from multiple partition fields, say 'year', 'month' and 
'day', you can configure 'partition.time-extractor.timestamp-pattern' to 
'$year$month$day', and configure `partition.time-extractor.timestamp-formatter` 
to 'yyyyMMdd'. By default the formatter is 'yyyy-MM-dd HH:mm:ss'. The 
timestamp-formatter is compatible with Java's 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html)
 |
   
   The default extractor is based on a timestamp pattern composed of your 
partition fields. 
   
   
   
   #### Partition Commit Policy
   
   The partition commit policy defines what action is taken when partitions are 
committed.
   
   | Option                                  | Required | Default  | Type   | 
Description                                                  |
   | :-------------------------------------- | :------- | :------- | :----- | 
:----------------------------------------------------------- |
   | sink.partition-commit.policy.kind       | optional | default  | String | 
Policy to commit a partition is to notify the downstream application that the 
partition has finished writing, the partition is ready to be read. default: add 
partition to meta data. success-file: add '_SUCESS' file to directory. custom: 
use policy class to create a commit policy. Support to configure multiple 
policies: 'default,success-file'. |
   | sink.partition-commit.policy.class      | optional | (none)   | String | 
The partition commit policy class for implement PartitionCommitPolicy 
interface. Only work in custom commit policy. |
   | sink.partition-commit.success-file.name | optional | _SUCCESS | String | 
The file name for success-file partition commit policy, default is '_SUCCESS'. |
   
   
   
   ### Full-example
   
   
   
   ```sql
   CREATE TABLE kafka_table (
     id STRING,
     name STRING,
     ts TIMESTAMP(3),
     WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
   ) WITH (...);
   
   CREATE TABLE iceberg_table (
     id STRING,
     name STRING,
     dt STRING,
     `hour` STRING
   ) PARTITIONED BY (dt, `hour`) WITH (
     'format-version'='2',
     'sink.partition-commit.enabled'='true',
     'sink.partition-commit.delay'='1h',
     'sink.partition-commit.policy.kind'='success-file',
     'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
     'partition.time-extractor.timestamp-format'='yyyy-MM-dd HH:mm:ss'
    );
   
   INSERT INTO iceberg_table 
   SELECT 
       id,
       name, 
       DATE_FORMAT(ts, 'yyyy-MM-dd'),
       DATE_FORMAT(ts, 'HH') 
   FROM kafka_table;
   
   -- batch sql, select with partition pruning
   SELECT * FROM iceberg_table /*+ OPTIONS('streaming'='true', 
'monitor-interval'='3s')*/ ;
   ```
   
   
   ### To do
   
   - When checkpoint is triggered, store the unclosed writers to the state
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to