This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ea59b  [mysql] add mysql chunk column options (#188)
f8ea59b is described below

commit f8ea59ba79419018dd495bb107410718d708b61d
Author: wudi <676366...@qq.com>
AuthorDate: Thu Aug 31 10:14:36 2023 +0800

    [mysql] add mysql chunk column options (#188)
    
    1、add mysql column key column:
    
        `--mysql-conf scan.incremental.snapshot.chunk.key-column = 
db.tbl1:col1,db.tbl2:col2`
    
        mysqlcdc version must be greater than 2.4
    
    2、fix sqlserver type
    
    ---------
    
    Co-authored-by: wudi <>
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  3 +-
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   | 51 +++++++++++++++++-----
 .../flink/tools/cdc/sqlserver/SqlServerType.java   |  7 ++-
 3 files changed, 45 insertions(+), 16 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 29c554e..bc15987 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -94,6 +94,7 @@ public abstract class DatabaseSync {
         DorisSystem dorisSystem = new DorisSystem(options);
 
         List<SourceSchema> schemaList = getSchemaList();
+        Preconditions.checkState(!schemaList.isEmpty(), "No tables to be 
synchronized.");
         if (!dorisSystem.databaseExists(database)) {
             LOG.info("database {} not exist, created", database);
             dorisSystem.createDatabase(database);
@@ -118,9 +119,7 @@ public abstract class DatabaseSync {
             System.exit(0);
         }
 
-        Preconditions.checkState(!syncTables.isEmpty(), "No tables to be 
synchronized.");
         config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", 
syncTables) + ")");
-
         DataStreamSource<String> streamSource = buildCdcSource(env);
         SingleOutputStreamOperator<Void> parsedStream = 
streamSource.process(new ParsingProcessFunction(converter));
         for (String table : dorisTables) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 735fdb9..05dd298 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -26,7 +26,7 @@ import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonCon
 import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
-
+import org.apache.doris.flink.catalog.doris.DataModel;
 import 
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.DateToStringConverter;
@@ -34,7 +34,9 @@ import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class MysqlDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(MysqlDatabaseSync.class);
@@ -83,13 +87,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
                     }
                     SourceSchema sourceSchema =
                             new MysqlSchema(metaData, databaseName, tableName, 
tableComment);
-                    if (sourceSchema.primaryKeys.size() > 0) {
-                        //Only sync tables with primary keys
-                        schemaList.add(sourceSchema);
-                    } else {
-                        LOG.warn("table {} has no primary key, skip", 
tableName);
-                        System.out.println("table " + tableName + " has no 
primary key, skip.");
-                    }
+                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    schemaList.add(sourceSchema);
                 }
             }
         }
@@ -136,13 +135,11 @@ public class MysqlDatabaseSync extends DatabaseSync {
         config
                 
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
                 .ifPresent(sourceBuilder::splitSize);
-
-        //Compatible with flink cdc mysql 2.3.0, close this option first
-        /* config
+        config
                 
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED)
                 .ifPresent(sourceBuilder::closeIdleReaders);
-         **/
 
+        setChunkColumns(sourceBuilder);
         String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
         if ("initial".equalsIgnoreCase(startupMode)) {
             sourceBuilder.startupOptions(StartupOptions.initial());
@@ -205,6 +202,36 @@ public class MysqlDatabaseSync extends DatabaseSync {
         return streamSource;
     }
 
+    /**
+     * set chunkkeyColumn,eg: db.table1:column1,db.table2:column2
+     * @param sourceBuilder
+     */
+    private void setChunkColumns(MySqlSourceBuilder<String> sourceBuilder) {
+        Map<ObjectPath, String> chunkColumnMap = getChunkColumnMap();
+        for(Map.Entry<ObjectPath, String> entry : chunkColumnMap.entrySet()){
+            sourceBuilder.chunkKeyColumn(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private Map<ObjectPath, String> getChunkColumnMap(){
+        Map<ObjectPath, String> chunkMap = new HashMap<>();
+        String chunkColumn = 
config.getString(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+        if(!StringUtils.isNullOrWhitespaceOnly(chunkColumn)){
+            final Pattern chunkPattern = 
Pattern.compile("(\\S+)\\.(\\S+):(\\S+)");
+            String[] tblColumns = chunkColumn.split(",");
+            for(String tblCol : tblColumns){
+                Matcher matcher = chunkPattern.matcher(tblCol);
+                if(matcher.find()){
+                    String db = matcher.group(1);
+                    String table = matcher.group(2);
+                    String col = matcher.group(3);
+                    chunkMap.put(new ObjectPath(db, table), col);
+                }
+            }
+        }
+        return chunkMap;
+    }
+
     private Properties getJdbcProperties(){
         Properties jdbcProps = new Properties();
         for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
index f09bd76..f2895c6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
@@ -47,8 +47,11 @@ public class SqlServerType {
     private static final String BINARY = "binary";
     private static final String VARBINARY = "varbinary";
 
-    public static String toDorisType(String sqlServerType, Integer precision, 
Integer scale) {
-        sqlServerType = sqlServerType.toLowerCase();
+    public static String toDorisType(String originSqlServerType, Integer 
precision, Integer scale) {
+        originSqlServerType = originSqlServerType.toLowerCase();
+        // For sqlserver IDENTITY type, such as 'INT IDENTITY'
+        // originSqlServerType is "int identity", so we only get "int".
+        String sqlServerType = originSqlServerType.split(" ")[0];
         switch (sqlServerType){
             case BIT:
                 return DorisType.BOOLEAN;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to