This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new f8ea59b [mysql] add mysql chunk column options (#188) f8ea59b is described below commit f8ea59ba79419018dd495bb107410718d708b61d Author: wudi <676366...@qq.com> AuthorDate: Thu Aug 31 10:14:36 2023 +0800 [mysql] add mysql chunk column options (#188) 1、add mysql column key column: `--mysql-conf scan.incremental.snapshot.chunk.key-column = db.tbl1:col1,db.tbl2:col2` mysqlcdc version must be greater than 2.4 2、fix sqlserver type --------- Co-authored-by: wudi <> --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 3 +- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 51 +++++++++++++++++----- .../flink/tools/cdc/sqlserver/SqlServerType.java | 7 ++- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 29c554e..bc15987 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -94,6 +94,7 @@ public abstract class DatabaseSync { DorisSystem dorisSystem = new DorisSystem(options); List<SourceSchema> schemaList = getSchemaList(); + Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized."); if (!dorisSystem.databaseExists(database)) { LOG.info("database {} not exist, created", database); dorisSystem.createDatabase(database); @@ -118,9 +119,7 @@ public abstract class DatabaseSync { System.exit(0); } - Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized."); config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")"); - DataStreamSource<String> streamSource = buildCdcSource(env); SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter)); for (String table : dorisTables) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 735fdb9..05dd298 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -26,7 +26,7 @@ import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonCon import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; - +import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.DateToStringConverter; @@ -34,7 +34,9 @@ import org.apache.doris.flink.tools.cdc.SourceSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); @@ -83,13 +87,8 @@ public class MysqlDatabaseSync extends DatabaseSync { } SourceSchema sourceSchema = new MysqlSchema(metaData, databaseName, tableName, tableComment); - if (sourceSchema.primaryKeys.size() > 0) { - //Only sync tables with primary keys - schemaList.add(sourceSchema); - } else { - LOG.warn("table {} has no primary key, skip", tableName); - System.out.println("table " + tableName + " has no primary key, skip."); - } + sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + schemaList.add(sourceSchema); } } } @@ -136,13 +135,11 @@ public class MysqlDatabaseSync extends DatabaseSync { config .getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE) .ifPresent(sourceBuilder::splitSize); - - //Compatible with flink cdc mysql 2.3.0, close this option first - /* config + config .getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED) .ifPresent(sourceBuilder::closeIdleReaders); - **/ + setChunkColumns(sourceBuilder); String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE); if ("initial".equalsIgnoreCase(startupMode)) { sourceBuilder.startupOptions(StartupOptions.initial()); @@ -205,6 +202,36 @@ public class MysqlDatabaseSync extends DatabaseSync { return streamSource; } + /** + * set chunkkeyColumn,eg: db.table1:column1,db.table2:column2 + * @param sourceBuilder + */ + private void setChunkColumns(MySqlSourceBuilder<String> sourceBuilder) { + Map<ObjectPath, String> chunkColumnMap = getChunkColumnMap(); + for(Map.Entry<ObjectPath, String> entry : chunkColumnMap.entrySet()){ + sourceBuilder.chunkKeyColumn(entry.getKey(), entry.getValue()); + } + } + + private Map<ObjectPath, String> getChunkColumnMap(){ + Map<ObjectPath, String> chunkMap = new HashMap<>(); + String chunkColumn = config.getString(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); + if(!StringUtils.isNullOrWhitespaceOnly(chunkColumn)){ + final Pattern chunkPattern = Pattern.compile("(\\S+)\\.(\\S+):(\\S+)"); + String[] tblColumns = chunkColumn.split(","); + for(String tblCol : tblColumns){ + Matcher matcher = chunkPattern.matcher(tblCol); + if(matcher.find()){ + String db = matcher.group(1); + String table = matcher.group(2); + String col = matcher.group(3); + chunkMap.put(new ObjectPath(db, table), col); + } + } + } + return chunkMap; + } + private Properties getJdbcProperties(){ Properties jdbcProps = new Properties(); for (Map.Entry<String, String> entry : config.toMap().entrySet()) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java index f09bd76..f2895c6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java @@ -47,8 +47,11 @@ public class SqlServerType { private static final String BINARY = "binary"; private static final String VARBINARY = "varbinary"; - public static String toDorisType(String sqlServerType, Integer precision, Integer scale) { - sqlServerType = sqlServerType.toLowerCase(); + public static String toDorisType(String originSqlServerType, Integer precision, Integer scale) { + originSqlServerType = originSqlServerType.toLowerCase(); + // For sqlserver IDENTITY type, such as 'INT IDENTITY' + // originSqlServerType is "int identity", so we only get "int". + String sqlServerType = originSqlServerType.split(" ")[0]; switch (sqlServerType){ case BIT: return DorisType.BOOLEAN; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org