This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ee4c041444 [doc](flink) add flink delete column from kafka specified columns (#20545) ee4c041444 is described below commit ee4c041444ca0709987ec88f76082c55f7279cf6 Author: wudi <676366...@qq.com> AuthorDate: Thu Jun 8 13:42:27 2023 +0800 [doc](flink) add flink delete column from kafka specified columns (#20545) --- docs/en/docs/ecosystem/flink-doris-connector.md | 39 ++++++++++++++++++++++ docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 39 ++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md b/docs/en/docs/ecosystem/flink-doris-connector.md index 48c67b713d..e3671ceac9 100644 --- a/docs/en/docs/ecosystem/flink-doris-connector.md +++ b/docs/en/docs/ecosystem/flink-doris-connector.md @@ -388,6 +388,45 @@ For the update of the primary key column, FlinkCDC will send DELETE and INSERT e ### Example The Flink program can refer to the CDC synchronization example above. After the task is successfully submitted, execute the Update primary key column statement (`update student set id = '1002' where id = '1001'`) on the MySQL side to modify the data in Doris . +## Use Flink to delete data based on specified columns + +Generally, messages in Kafka use specific fields to mark the operation type, such as {"op_type":"delete",data:{...}}. For this type of data, it is hoped that the data with op_type=delete will be deleted. + +By default, DorisSink will distinguish the type of event based on RowKind. Usually, in the case of cdc, the event type can be obtained directly, and the hidden column `__DORIS_DELETE_SIGN__` is assigned to achieve the purpose of deletion, while Kafka needs to be based on business logic. Judgment, display the value passed in to the hidden column. + +### Example + +```sql +-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}} +CREATE TABLE KAFKA_SOURCE( + data STRING, + op_type STRING +) WITH ( + 'connector' = 'kafka', + ... +); + +CREATE TABLE DORIS_SINK( + id INT, + name STRING, + __DORIS_DELETE_SIGN__ INT +) WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'db.table', + 'username' = 'root', + 'password' = '', + 'sink.enable-delete' = 'false', -- false means not to get the event type from RowKind + 'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__' -- Display the import column of the specified streamload +); + +INSERT INTO KAFKA_SOURCE +SELECT json_value(data,'$.id') as id, +json_value(data,'$.name') as name, +if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ +from KAFKA_SOURCE; +``` + ## Java example `samples/doris-demo/` An example of the Java version is provided below for reference, see [here](https://github.com/apache/doris/tree/master/samples/doris-demo/) diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md index d0bf80d677..8fc3061979 100644 --- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md +++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md @@ -383,6 +383,45 @@ Flink CDC底层的采集工具是Debezium,Debezium内部使用op字段来标 ### 使用 Flink程序可参考上面CDC同步的示例,成功提交任务后,在MySQL侧执行Update主键列的语句(`update student set id = '1002' where id = '1001'`),即可修改Doris中的数据。 +## 使用Flink根据指定列删除数据 + +一般Kafka中的消息会使用特定字段来标记操作类型,比如{"op_type":"delete",data:{...}}。针对这类数据,希望将op_type=delete的数据删除掉。 + +DorisSink默认会根据RowKind来区分事件的类型,通常这种在cdc情况下可以直接获取到事件类型,对隐藏列`__DORIS_DELETE_SIGN__`进行赋值达到删除的目的,而Kafka则需要根据业务逻辑判断,显示的传入隐藏列的值。 + +### 使用 + +```sql +-- 比如上游数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}} +CREATE TABLE KAFKA_SOURCE( + data STRING, + op_type STRING +) WITH ( + 'connector' = 'kafka', + ... +); + +CREATE TABLE DORIS_SINK( + id INT, + name STRING, + __DORIS_DELETE_SIGN__ INT +) WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'table.identifier' = 'db.table', + 'username' = 'root', + 'password' = '', + 'sink.enable-delete' = 'false', -- false表示不从RowKind获取事件类型 + 'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__' -- 显示指定streamload的导入列 +); + +INSERT INTO KAFKA_SOURCE +SELECT json_value(data,'$.id') as id, +json_value(data,'$.name') as name, +if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ +from KAFKA_SOURCE; +``` + ## Java示例 `samples/doris-demo/` 下提供了 Java 版本的示例,可供参考,查看点击[这里](https://github.com/apache/doris/tree/master/samples/doris-demo/) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org