This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee4c041444 [doc](flink) add flink delete column from kafka specified 
columns (#20545)
ee4c041444 is described below

commit ee4c041444ca0709987ec88f76082c55f7279cf6
Author: wudi <676366...@qq.com>
AuthorDate: Thu Jun 8 13:42:27 2023 +0800

    [doc](flink) add flink delete column from kafka specified columns (#20545)
---
 docs/en/docs/ecosystem/flink-doris-connector.md    | 39 ++++++++++++++++++++++
 docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 39 ++++++++++++++++++++++
 2 files changed, 78 insertions(+)

diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md 
b/docs/en/docs/ecosystem/flink-doris-connector.md
index 48c67b713d..e3671ceac9 100644
--- a/docs/en/docs/ecosystem/flink-doris-connector.md
+++ b/docs/en/docs/ecosystem/flink-doris-connector.md
@@ -388,6 +388,45 @@ For the update of the primary key column, FlinkCDC will 
send DELETE and INSERT e
 ### Example
 The Flink program can refer to the CDC synchronization example above. After 
the task is successfully submitted, execute the Update primary key column 
statement (`update student set id = '1002' where id = '1001'`) on the MySQL 
side to modify the data in Doris .
 
+## Use Flink to delete data based on specified columns
+
+Generally, messages in Kafka use specific fields to mark the operation type, 
such as {"op_type":"delete",data:{...}}. For this type of data, it is hoped 
that the data with op_type=delete will be deleted.
+
+By default, DorisSink will distinguish the type of event based on RowKind. 
Usually, in the case of cdc, the event type can be obtained directly, and the 
hidden column `__DORIS_DELETE_SIGN__` is assigned to achieve the purpose of 
deletion, while Kafka needs to be based on business logic. Judgment, display 
the value passed in to the hidden column.
+
+### Example
+
+```sql
+-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
+CREATE TABLE KAFKA_SOURCE(
+  data STRING,
+  op_type STRING
+) WITH (
+  'connector' = 'kafka',
+  ...
+);
+
+CREATE TABLE DORIS_SINK(
+  id INT,
+  name STRING,
+  __DORIS_DELETE_SIGN__ INT
+) WITH (
+  'connector' = 'doris',
+  'fenodes' = '127.0.0.1:8030',
+  'table.identifier' = 'db.table',
+  'username' = 'root',
+  'password' = '',
+  'sink.enable-delete' = 'false',        -- false means not to get the event 
type from RowKind
+  'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__'  -- Display the 
import column of the specified streamload
+);
+
+INSERT INTO KAFKA_SOURCE
+SELECT json_value(data,'$.id') as id,
+json_value(data,'$.name') as name, 
+if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
+from KAFKA_SOURCE;
+```
+
 ## Java example
 
 `samples/doris-demo/`  An example of the Java version is provided below for 
reference, see 
[here](https://github.com/apache/doris/tree/master/samples/doris-demo/)
diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md 
b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
index d0bf80d677..8fc3061979 100644
--- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
+++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
@@ -383,6 +383,45 @@ Flink CDC底层的采集工具是Debezium,Debezium内部使用op字段来标
 ### 使用
 Flink程序可参考上面CDC同步的示例,成功提交任务后,在MySQL侧执行Update主键列的语句(`update  student set id = 
'1002' where id = '1001'`),即可修改Doris中的数据。
 
+## 使用Flink根据指定列删除数据
+
+一般Kafka中的消息会使用特定字段来标记操作类型,比如{"op_type":"delete",data:{...}}。针对这类数据,希望将op_type=delete的数据删除掉。
+
+DorisSink默认会根据RowKind来区分事件的类型,通常这种在cdc情况下可以直接获取到事件类型,对隐藏列`__DORIS_DELETE_SIGN__`进行赋值达到删除的目的,而Kafka则需要根据业务逻辑判断,显示的传入隐藏列的值。
+
+### 使用
+
+```sql
+-- 比如上游数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
+CREATE TABLE KAFKA_SOURCE(
+  data STRING,
+  op_type STRING
+) WITH (
+  'connector' = 'kafka',
+  ...
+);
+
+CREATE TABLE DORIS_SINK(
+  id INT,
+  name STRING,
+  __DORIS_DELETE_SIGN__ INT
+) WITH (
+  'connector' = 'doris',
+  'fenodes' = '127.0.0.1:8030',
+  'table.identifier' = 'db.table',
+  'username' = 'root',
+  'password' = '',
+  'sink.enable-delete' = 'false',        -- false表示不从RowKind获取事件类型
+  'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__'  -- 
显示指定streamload的导入列
+);
+
+INSERT INTO KAFKA_SOURCE
+SELECT json_value(data,'$.id') as id,
+json_value(data,'$.name') as name, 
+if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
+from KAFKA_SOURCE;
+```
+
 ## Java示例
 
 `samples/doris-demo/` 下提供了 Java 
版本的示例,可供参考,查看点击[这里](https://github.com/apache/doris/tree/master/samples/doris-demo/)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to