This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 43f9e014 [Fix](cdc) fix enable-delete  option not work (#455)
43f9e014 is described below

commit 43f9e014c4742949e9625106252a375a1e51b82c
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Mon Aug 5 18:19:53 2024 +0800

    [Fix](cdc) fix enable-delete  option not work (#455)
---
 .../serializer/JsonDebeziumSchemaSerializer.java   |   5 +-
 .../jsondebezium/JsonDebeziumChangeContext.java    |   9 +-
 .../jsondebezium/JsonDebeziumDataChange.java       |   4 +-
 .../MongoDBJsonDebeziumSchemaSerializer.java       |   5 +-
 .../serializer/MongoJsonDebeziumDataChange.java    |   4 +-
 .../jsondebezium/TestJsonDebeziumDataChange.java   |   6 +-
 .../TestJsonDebeziumSchemaChangeImpl.java          |   3 +-
 .../TestJsonDebeziumSchemaChangeImplV2.java        |   3 +-
 .../jsondebezium/TestSQLParserSchemaChange.java    |   3 +-
 .../doris/flink/tools/cdc/MySQLDorisE2ECase.java   | 116 +++++++++++++++++++++
 10 files changed, 148 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 83dec58b..9c89fce3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -70,6 +70,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private final boolean newSchemaChange;
     private String lineDelimiter = LINE_DELIMITER_DEFAULT;
     private boolean ignoreUpdateBefore = true;
+    private boolean enableDelete = true;
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
     // create table properties
@@ -111,6 +112,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                             .getStreamLoadProp()
                             .getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT);
             this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
+            this.enableDelete = executionOptions.getDeletable();
         }
     }
 
@@ -149,7 +151,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                         lineDelimiter,
                         ignoreUpdateBefore,
                         targetTablePrefix,
-                        targetTableSuffix);
+                        targetTableSuffix,
+                        enableDelete);
         initSchemaChangeInstance(changeContext);
         this.dataChange = new JsonDebeziumDataChange(changeContext);
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index d1326c72..2f7764f3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -41,6 +41,7 @@ public class JsonDebeziumChangeContext implements 
Serializable {
     private final Pattern pattern;
     private final String lineDelimiter;
     private final boolean ignoreUpdateBefore;
+    private final boolean enableDelete;
     private final String targetTablePrefix;
     private final String targetTableSuffix;
 
@@ -55,7 +56,8 @@ public class JsonDebeziumChangeContext implements 
Serializable {
             String lineDelimiter,
             boolean ignoreUpdateBefore,
             String targetTablePrefix,
-            String targetTableSuffix) {
+            String targetTableSuffix,
+            boolean enableDelete) {
         this.dorisOptions = dorisOptions;
         this.tableMapping = tableMapping;
         this.sourceTableName = sourceTableName;
@@ -65,6 +67,7 @@ public class JsonDebeziumChangeContext implements 
Serializable {
         this.pattern = pattern;
         this.lineDelimiter = lineDelimiter;
         this.ignoreUpdateBefore = ignoreUpdateBefore;
+        this.enableDelete = enableDelete;
         this.targetTablePrefix = targetTablePrefix;
         this.targetTableSuffix = targetTableSuffix;
     }
@@ -116,6 +119,10 @@ public class JsonDebeziumChangeContext implements 
Serializable {
         return targetTableSuffix;
     }
 
+    public boolean enableDelete() {
+        return enableDelete;
+    }
+
     public DorisTableConfig getDorisTableConf() {
         return dorisTableConfig;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
index 5075adf8..298cfb95 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -50,6 +50,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
     private final ObjectMapper objectMapper;
     private final DorisOptions dorisOptions;
     private final boolean ignoreUpdateBefore;
+    private final boolean enableDelete;
     private final String lineDelimiter;
     private final JsonDebeziumChangeContext changeContext;
 
@@ -59,6 +60,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
         this.objectMapper = changeContext.getObjectMapper();
         this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore();
         this.lineDelimiter = changeContext.getLineDelimiter();
+        this.enableDelete = changeContext.enableDelete();
     }
 
     public DorisRecord serialize(String record, JsonNode recordRoot, String 
op) throws IOException {
@@ -87,7 +89,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
                 return DorisRecord.of(dorisTableIdentifier, 
extractUpdate(recordRoot));
             case OP_DELETE:
                 valueMap = extractBeforeRow(recordRoot);
-                addDeleteSign(valueMap, true);
+                addDeleteSign(valueMap, enableDelete);
                 break;
             default:
                 LOG.error("parse record fail, unknown op {} in {}", op, 
record);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index b7faec2f..d4a87ff8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -50,6 +50,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
     private final String sourceTableName;
     private String lineDelimiter = LINE_DELIMITER_DEFAULT;
     private boolean ignoreUpdateBefore = true;
+    private boolean enableDelete = true;
     // <cdc db.schema.table, doris db.table>
     private Map<String, String> tableMapping;
     // create table properties
@@ -90,6 +91,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
                             .getStreamLoadProp()
                             .getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT);
             this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
+            this.enableDelete = executionOptions.getDeletable();
         }
         init();
     }
@@ -107,7 +109,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
                         lineDelimiter,
                         ignoreUpdateBefore,
                         targetTablePrefix,
-                        targetTableSuffix);
+                        targetTableSuffix,
+                        enableDelete);
         this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
         this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
index 8048e38a..9dbe7ffe 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
@@ -61,6 +61,7 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
     public JsonDebeziumChangeContext changeContext;
     public ObjectMapper objectMapper;
     public Map<String, String> tableMapping;
+    private final boolean enableDelete;
 
     public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext 
changeContext) {
         this.changeContext = changeContext;
@@ -68,6 +69,7 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
         this.objectMapper = changeContext.getObjectMapper();
         this.lineDelimiter = changeContext.getLineDelimiter();
         this.tableMapping = changeContext.getTableMapping();
+        this.enableDelete = changeContext.enableDelete();
     }
 
     @Override
@@ -93,7 +95,7 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
                 break;
             case OP_DELETE:
                 valueMap = extractDeleteRow(recordRoot);
-                addDeleteSign(valueMap, true);
+                addDeleteSign(valueMap, enableDelete);
                 break;
             default:
                 LOG.error("parse record fail, unknown op {} in {}", op, 
record);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index 4891d820..f8098ccc 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -50,7 +50,8 @@ public class TestJsonDebeziumDataChange extends 
TestJsonDebeziumChangeBase {
                         lineDelimiter,
                         ignoreUpdateBefore,
                         "",
-                        "");
+                        "",
+                        true);
         dataChange = new JsonDebeziumDataChange(changeContext);
     }
 
@@ -113,7 +114,8 @@ public class TestJsonDebeziumDataChange extends 
TestJsonDebeziumChangeBase {
                         lineDelimiter,
                         false,
                         "",
-                        "");
+                        "",
+                        true);
         dataChange = new JsonDebeziumDataChange(changeContext);
 
         // update t1 set name='doris-update' WHERE id =1;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
index caf5542c..e66ecaab 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -63,7 +63,8 @@ public class TestJsonDebeziumSchemaChangeImpl extends 
TestJsonDebeziumChangeBase
                         lineDelimiter,
                         ignoreUpdateBefore,
                         "",
-                        "");
+                        "",
+                        true);
         schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 68239618..3eec0fb6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -83,7 +83,8 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                         lineDelimiter,
                         ignoreUpdateBefore,
                         "",
-                        "");
+                        "",
+                        true);
         schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
index d3194f81..a7958b70 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -46,7 +46,8 @@ public class TestSQLParserSchemaChange extends 
TestJsonDebeziumChangeBase {
                         lineDelimiter,
                         ignoreUpdateBefore,
                         "",
-                        "");
+                        "",
+                        true);
         schemaChange = new SQLParserSchemaChange(changeContext);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 6a613841..aeb17c29 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -503,6 +503,122 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         jobClient.cancel().get();
     }
 
+    @Test
+    public void testMySQL2DorisEnableDelete() throws Exception {
+        printClusterStatus();
+        initializeMySQLTable();
+        initializeDorisTable();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        Map<String, String> flinkMap = new HashMap<>();
+        flinkMap.put("execution.checkpointing.interval", "10s");
+        flinkMap.put("pipeline.operator-chaining", "false");
+        flinkMap.put("parallelism.default", "1");
+
+        Configuration configuration = Configuration.fromMap(flinkMap);
+        env.configure(configuration);
+
+        String database = DATABASE;
+        Map<String, String> mysqlConfig = new HashMap<>();
+        mysqlConfig.put("database-name", DATABASE);
+        mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
+        mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
+        mysqlConfig.put("username", MYSQL_USER);
+        mysqlConfig.put("password", MYSQL_PASSWD);
+        mysqlConfig.put("server-time-zone", "Asia/Shanghai");
+        Configuration config = Configuration.fromMap(mysqlConfig);
+
+        Map<String, String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes", getFenodes());
+        sinkConfig.put("username", USERNAME);
+        sinkConfig.put("password", PASSWORD);
+        sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, 
DORIS_CONTAINER.getHost()));
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        sinkConfig.put("sink.check-interval", "5000");
+        sinkConfig.put("sink.enable-delete", "false");
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+
+        String includingTables = "tbl1|tbl2|tbl3|tbl5";
+        String excludingTables = "";
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync
+                .setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setIgnoreDefaultValue(false)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(true)
+                // no single sink
+                .setSingleSink(false)
+                .create();
+        databaseSync.build();
+        JobClient jobClient = env.executeAsync();
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(10)));
+
+        // wait 2 times checkpoint
+        Thread.sleep(20000);
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3", "doris_5,5");
+        String sql =
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by 
1";
+        String query1 =
+                String.format(
+                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, 
TABLE_3, DATABASE,
+                        TABLE_5);
+        checkResult(expected, query1, 2);
+
+        // add incremental data
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, 
MYSQL_PASSWD);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format("insert into %s.%s  values 
('doris_1_1',10)", DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("insert into %s.%s  values 
('doris_2_1',11)", DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("insert into %s.%s  values 
('doris_3_1',12)", DATABASE, TABLE_3));
+
+            statement.execute(
+                    String.format(
+                            "update %s.%s set age=18 where name='doris_1'", 
DATABASE, TABLE_1));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_2'", 
DATABASE, TABLE_2));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_3'", 
DATABASE, TABLE_3));
+            statement.execute(
+                    String.format("delete from %s.%s where name='doris_5'", 
DATABASE, TABLE_5));
+        }
+
+        Thread.sleep(20000);
+        List<String> expected2 =
+                Arrays.asList(
+                        "doris_1,18",
+                        "doris_1_1,10",
+                        "doris_2,2",
+                        "doris_2_1,11",
+                        "doris_3,3",
+                        "doris_3_1,12",
+                        "doris_5,5");
+        sql =
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by 
1";
+        String query2 =
+                String.format(
+                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, 
TABLE_3, DATABASE,
+                        TABLE_5);
+        checkResult(expected2, query2, 2);
+        jobClient.cancel().get();
+    }
+
     private void initializeDorisTable() throws Exception {
         try (Connection connection =
                         DriverManager.getConnection(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to