This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c03f7c3ba4 [sample](flink-connector) add doris data delete function 
(#12599)
c03f7c3ba4 is described below

commit c03f7c3ba427dbb12fd341e0f0df8dc9fee640ce
Author: liwenqi1996 <58578174+liwenqi1...@users.noreply.github.com>
AuthorDate: Wed Sep 14 19:18:59 2022 +0800

    [sample](flink-connector) add doris data delete function (#12599)
---
 .../demo/flink/dbsync/DatabaseFullDelSync.java     | 244 +++++++++++++++++++++
 1 file changed, 244 insertions(+)

diff --git 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
new file mode 100644
index 0000000000..22da0b0f84
--- /dev/null
+++ 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.dbsync;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+/***
+ *
+ * Synchronize the full database through flink cdc
+ *
+ */
+public class DatabaseFullDelSync {
+
+    private static String SOURCE_DB = "custom_db";//db
+    private static String SOURCE_TABLS = "custom_db.test_flink_doris"; //tables
+    private static String SOURCE_IP = "127.0.0.1"; //hostname
+    private static String SOURCE_USER = "root"; //username
+    private static String SOURCE_PWD = "root"; //password
+    private static int SOURCE_PORT = 3306;
+
+
+    private static String DORIS_IP = "127.0.0.1";
+    private static String DORIS_PORT = "8030";
+    private static String DORIS_USER = "root";
+    private static String DORIS_PWD = "root";
+    private static String DORIS_DB = "table_structure.test_flink_doris";
+    private static final Logger log = 
LoggerFactory.getLogger(DatabaseFullDelSync.class);
+
+    public static void main(String[] args) throws Exception {
+
+        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
+            .hostname(SOURCE_IP)
+            .port(SOURCE_PORT)
+            .databaseList(SOURCE_DB) // set captured database
+            .tableList(SOURCE_TABLS) // set captured table
+            .username(SOURCE_USER) // set captured user
+            .password(SOURCE_PWD) // set captured pwd
+            .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+            .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        // enable checkpoint
+        env.enableCheckpointing(10000);
+        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, 
WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
+        //get table list
+        List<String> tableList = getTableList();
+        if (null != tableList && tableList.size() > 0) {
+            //get column map
+            Map<String, String> tableColumn = getTableColumn();
+            for (String tbl : tableList) {
+                String column = tableColumn.get(tbl);
+                SingleOutputStreamOperator fifterStream = 
fifterTableData(cdcSource, tbl);
+                SingleOutputStreamOperator<String> cleanStream = 
cleanData(fifterStream);
+                DorisSink dorisSink = buildDorisSink(tbl, column);
+                cleanStream.sinkTo(dorisSink).name(tbl);
+            }
+            env.execute("Full Database Sync");
+        }
+
+    }
+
+
+    /**
+     * Get real data
+     * {
+     * "before":null,
+     * "after":{
+     * "id":1,
+     * "name":"zhangsan-1",
+     * "age":18
+     * },
+     * "source":{
+     * "db":"test",
+     * "table":"test_1",
+     * ...
+     * },
+     * "op":"c",
+     * ...
+     * }
+     */
+    public static SingleOutputStreamOperator 
fifterTableData(DataStreamSource<String> cdcSource, String table) {
+        return cdcSource.filter(new FilterFunction<String>() {
+            @Override
+            public boolean filter(String row) throws Exception {
+                try {
+                    JSONObject rowJson = JSON.parseObject(row);
+                    JSONObject source = rowJson.getJSONObject("source");
+                    String tbl = source.getString("table");
+                    return table.equals(tbl);
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                    return false;
+                }
+            }
+        });
+    }
+
+
+    //cleanData
+    public static SingleOutputStreamOperator<String> 
cleanData(SingleOutputStreamOperator source) {
+        return source.flatMap(new FlatMapFunction<String, String>() {
+            @Override
+            public void flatMap(String row, Collector<String> out) throws 
Exception {
+                try {
+                    JSONObject rowJson = JSON.parseObject(row);
+                    String op = rowJson.getString("op");
+                    if (Arrays.asList("c", "r", "u").contains(op)) {
+                        JSONObject after = rowJson.getJSONObject("after");
+                        after.put("__DORIS_DELETE_SIGN__", false);
+                        out.collect(after.toJSONString());
+                    } else if ("d".equals(op)) {
+                        JSONObject before = rowJson.getJSONObject("before");
+                        before.put("__DORIS_DELETE_SIGN__", true);
+                        out.collect(before.toJSONString());
+                    } else {
+                        log.info("filter other op:{}", op);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    log.warn("filter other format binlog:{} ", row);
+                }
+            }
+        });
+    }
+
+    // create doris sink
+    public static DorisSink buildDorisSink(String table, String tableColumn) {
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes(DORIS_IP + ":" + DORIS_PORT)
+            .setTableIdentifier(DORIS_DB)
+            .setUsername(DORIS_USER)
+            .setPassword(DORIS_PWD);
+
+        Properties pro = new Properties();
+        //json data format
+        pro.setProperty("format", "json");
+        pro.setProperty("read_json_by_line", "true");
+        //delete use
+        pro.setProperty("columns", tableColumn + ",`__DORIS_DELETE_SIGN__`");
+        DorisExecutionOptions executionOptions = 
DorisExecutionOptions.builder()
+            .setLabelPrefix("label-" + System.currentTimeMillis() + table) 
//streamload label prefix,
+            .setStreamLoadProp(pro)
+            .setDeletable(true)
+            .build();
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+            .setDorisExecutionOptions(executionOptions)
+            .setSerializer(new SimpleStringSerializer()) //serialize according 
to string
+            .setDorisOptions(dorisBuilder.build());
+
+        return builder.build();
+    }
+
+
+    public static List<String> getTableList() {
+        List<String> list = new ArrayList<>();
+        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM 
information_schema.tables  WHERE TABLE_SCHEMA = '" + SOURCE_DB + "'";
+        try {
+            List<JSONObject> JSONObject = JdbcUtil.executeQuery(SOURCE_IP, 
SOURCE_PORT, SOURCE_USER, SOURCE_PWD, sql);
+            if (null != JSONObject && JSONObject.size() > 0) {
+                for (JSONObject json : JSONObject) {
+                    String tableSchema = json.getString("TABLE_SCHEMA");
+                    String tableName = json.getString("TABLE_NAME");
+                    String dbTable = tableSchema + "." + tableName;
+                    if (dbTable.matches(SOURCE_TABLS)) {
+                        list.add(tableName);
+                    }
+                }
+                return list;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+        return null;
+    }
+
+
+    public static Map<String, String> getTableColumn() {
+        Map<String, String> reMap = new HashMap<>();
+        String sql = "select 
TABLE_SCHEMA,TABLE_NAME,GROUP_CONCAT('`',COLUMN_NAME,'`') AS columnStr from 
information_schema.columns" +
+            " WHERE TABLE_SCHEMA =  '" + SOURCE_DB + "'" + " GROUP BY 
TABLE_SCHEMA,TABLE_NAME";
+        try {
+            List<JSONObject> JSONObject = JdbcUtil.executeQuery(SOURCE_IP, 
SOURCE_PORT, SOURCE_USER, SOURCE_PWD, sql);
+            if (null != JSONObject && JSONObject.size() > 0) {
+                for (JSONObject json : JSONObject) {
+                    String tableSchema = json.getString("TABLE_SCHEMA");
+                    String tableName = json.getString("TABLE_NAME");
+                    String columnStr = json.getString("columnStr");
+                    String dbTable = tableSchema + "." + tableName;
+                    if (dbTable.matches(SOURCE_TABLS)) {
+                        reMap.put(tableName, columnStr);
+                    }
+                }
+                return reMap;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+        return null;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to