This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 0eb3aa7 [Feature](CDC) Add auto create table (#248) 0eb3aa7 is described below commit 0eb3aa7075befc7478e014bbadd754af17deaf75 Author: wudi <676366...@qq.com> AuthorDate: Thu Nov 30 17:53:00 2023 +0800 [Feature](CDC) Add auto create table (#248) --- .../apache/doris/flink/sink/writer/EventType.java} | 25 +--- .../serializer/JsonDebeziumSchemaSerializer.java | 147 ++++++++++++++++----- .../doris/flink/table/DorisConfigOptions.java | 2 +- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 1 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 27 +++- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 10 +- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 10 +- .../tools/cdc/postgres/PostgresDatabaseSync.java | 10 +- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 10 +- .../writer/TestJsonDebeziumSchemaSerializer.java | 31 ++++- .../doris/flink/tools/cdc/DatabaseSyncTest.java | 16 +++ 11 files changed, 225 insertions(+), 64 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java similarity index 51% copy from flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java copy to flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java index daab90b..d26bf27 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java @@ -14,27 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.tools.cdc; -import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; -import org.junit.Test; -import java.util.Arrays; +package org.apache.doris.flink.sink.writer; -/** - * Unit tests for the {@link DatabaseSync}. - **/ -public class DatabaseSyncTest { - @Test - public void multiToOneRulesParserTest() throws Exception{ - String[][] testCase = { - {"a_.*|b_.*","a|b"} // Normal condition -// ,{"a_.*|b_.*","a|b|c"} // Unequal length -// ,{"",""} // Null value -// ,{"***....","a"} // Abnormal regular expression - }; - DatabaseSync databaseSync = new MysqlDatabaseSync(); - Arrays.stream(testCase).forEach(arr->{ - databaseSync.multiToOneRulesParser(arr[0], arr[1]); - }); - } +public enum EventType { + ALTER, + CREATE } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 0c4309b..29d9cee 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -26,12 +26,14 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.sink.schema.SchemaChangeHelper; import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema; import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.sink.writer.EventType; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; @@ -41,6 +43,7 @@ import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -67,7 +71,6 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private static final String OP_CREATE = "c"; // insert private static final String OP_UPDATE = "u"; // update private static final String OP_DELETE = "d"; // delete - public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; @@ -214,25 +217,40 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return false; } - // db,table - Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); - if(tuple == null){ + EventType eventType = extractEventType(recordRoot); + if(eventType == null){ return false; } - - List<String> ddlSqlList = extractDDLList(recordRoot); - if (CollectionUtils.isEmpty(ddlSqlList)) { - LOG.info("ddl can not do schema change:{}", recordRoot); - return false; - } - - List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); - status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + if(eventType.equals(EventType.CREATE)){ + TableSchema tableSchema = extractCreateTableSchema(recordRoot); + status = schemaChangeManager.createTable(tableSchema); + if(status){ + String cdcTbl = getCdcTableIdentifier(recordRoot); + String dorisTbl = getCreateTableIdentifier(recordRoot); + tableMapping.put(cdcTbl, dorisTbl); + LOG.info("create table ddl status: {}", status); + } + } else if (eventType.equals(EventType.ALTER)){ + // db,table + Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); + if(tuple == null){ + return false; + } + List<String> ddlSqlList = extractDDLList(recordRoot); + if (CollectionUtils.isEmpty(ddlSqlList)) { + LOG.info("ddl can not do schema change:{}", recordRoot); + return false; + } + List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); + for (int i = 0; i < ddlSqlList.size(); i++) { + DDLSchema ddlSchema = ddlSchemas.get(i); + String ddlSql = ddlSqlList.get(i); + boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); + status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); + LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + } + } else{ + LOG.info("Unsupported event type {}", eventType); } } catch (Exception ex) { LOG.warn("schema change error :", ex); @@ -240,18 +258,26 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return status; } + protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = extractHistoryRecord(record); + JsonNode tableChanges = historyRecord.get("tableChanges"); + if(!Objects.isNull(tableChanges)){ + JsonNode tableChange = tableChanges.get(0); + return tableChange; + } + return null; + } + + /** + * Parse Alter Event + */ @VisibleForTesting - public List<String> extractDDLList(JsonNode record) throws JsonProcessingException { + public List<String> extractDDLList(JsonNode record) throws IOException{ String dorisTable = getDorisTableIdentifier(record); JsonNode historyRecord = extractHistoryRecord(record); - JsonNode tableChanges = historyRecord.get("tableChanges"); String ddl = extractJsonNode(historyRecord, "ddl"); - if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) { - return new ArrayList<>(); - } - LOG.debug("received debezium ddl :{}", ddl); - JsonNode tableChange = tableChanges.get(0); - if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) { + JsonNode tableChange = extractTableChange(record); + if (Objects.isNull(tableChange) || Objects.isNull(ddl)) { return null; } @@ -285,6 +311,47 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return SchemaChangeHelper.generateDDLSql(dorisTable); } + @VisibleForTesting + public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException { + String dorisTable = getCreateTableIdentifier(record); + JsonNode tableChange = extractTableChange(record); + JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames"); + JsonNode columns = tableChange.get("table").get("columns"); + String tblComment = tableChange.get("table").get("comment").asText(); + Map<String, FieldSchema> field = new LinkedHashMap<>(); + for (JsonNode column : columns) { + buildFieldSchema(field, column); + } + List<String> pkList = new ArrayList<>(); + for(JsonNode column : pkColumns){ + String fieldName = column.asText(); + pkList.add(fieldName); + } + + TableSchema tableSchema = new TableSchema(); + tableSchema.setFields(field); + tableSchema.setKeys(pkList); + tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field)); + tableSchema.setTableComment(tblComment); + + String[] split = dorisTable.split("\\."); + Preconditions.checkArgument(split.length == 2); + tableSchema.setDatabase(split[0]); + tableSchema.setTable(split[1]); + return tableSchema; + } + + private List<String> buildDistributeKeys(List<String> primaryKeys, Map<String, FieldSchema> fields) { + if (!CollectionUtil.isNullOrEmpty(primaryKeys)) { + return primaryKeys; + } + if(!fields.isEmpty()){ + Map.Entry<String, FieldSchema> firstField = fields.entrySet().iterator().next(); + return Collections.singletonList(firstField.getKey()); + } + return new ArrayList<>(); + } + @VisibleForTesting public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) { this.originFieldSchemaMap = originFieldSchemaMap; @@ -334,6 +401,12 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return SourceSchema.getString(db, schema, table); } + public String getCreateTableIdentifier(JsonNode record){ + String db = extractJsonNode(record.get("source"), "db"); + String table = extractJsonNode(record.get("source"), "table"); + return db + "." + table; + } + public String getDorisTableIdentifier(String cdcTableIdentifier){ if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){ return dorisOptions.getTableIdentifier(); @@ -405,6 +478,23 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return extractJsonNode(record.get("source"), "table"); } + /** + * Parse event type + */ + protected EventType extractEventType(JsonNode record) throws JsonProcessingException { + JsonNode tableChange = extractTableChange(record); + if(tableChange == null || tableChange.get("type") == null){ + return null; + } + String type = tableChange.get("type").asText(); + if(EventType.ALTER.toString().equalsIgnoreCase(type)){ + return EventType.ALTER; + }else if(EventType.CREATE.toString().equalsIgnoreCase(type)){ + return EventType.CREATE; + } + return null; + } + private String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null && !(record.get(key) instanceof NullNode) ? record.get(key).asText() : null; @@ -425,7 +515,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException { - if (record.has("historyRecord")) { + if (record != null && record.has("historyRecord")) { return objectMapper.readTree(record.get("historyRecord").asText()); } // The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction @@ -452,8 +542,6 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return null; } - - @VisibleForTesting public void fillOriginSchema(JsonNode columns) { if (Objects.nonNull(originFieldSchemaMap)) { @@ -623,5 +711,4 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return type; } - } \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index af01ea5..546dc6e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -89,7 +89,7 @@ public class DorisConfigOptions { .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) .withDescription(""); public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions - .key("doris.request.retriesdoris.deserialize.queue.size") + .key("doris.deserialize.queue.size") .intType() .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) .withDescription(""); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index d05fa15..fe35ee3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -129,6 +129,7 @@ public class CdcTools { .setTableConfig(tableMap) .setCreateTableOnly(createTableOnly) .setNewSchemaChange(useNewSchemaChange) + .setSingleSink(singleSink) .create(); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index bf26214..02ab034 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Collectors; public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); @@ -83,6 +84,11 @@ public abstract class DatabaseSync { public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env); + /** + * Get the prefix of a specific tableList, for example, mysql is database, oracle is schema + */ + public abstract String getTableListPrefix(); + public DatabaseSync() throws SQLException { registerDriver(); } @@ -132,8 +138,7 @@ public abstract class DatabaseSync { System.out.println("Create table finished."); System.exit(0); } - - config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")"); + config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables)); DataStreamSource<String> streamSource = buildCdcSource(env); if(singleSink){ streamSource.sinkTo(buildDorisSink()); @@ -256,6 +261,24 @@ public abstract class DatabaseSync { LOG.debug("table {} is synchronized? {}", tableName, sync); return sync; } + + protected String getSyncTableList(List<String> syncTables){ + if(!singleSink){ + return syncTables.stream() + .map(v-> getTableListPrefix() + "\\." + v) + .collect(Collectors.joining("|")); + }else{ + // includingTablePattern and ^excludingPattern + String includingPattern = String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables); + if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) { + return includingPattern; + }else{ + String excludingPattern = String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables); + return String.format("(%s)(%s)", includingPattern, excludingPattern); + } + } + } + /** * Filter table that many tables merge to one */ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 22e49aa..a3e01d3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -122,7 +122,9 @@ public class MysqlDatabaseSync extends DatabaseSync { .username(config.get(MySqlSourceOptions.USERNAME)) .password(config.get(MySqlSourceOptions.PASSWORD)) .databaseList(databaseName) - .tableList(databaseName + "." + tableName); + .tableList(tableName) + //default open add newly table + .scanNewlyAddedTableEnabled(true); config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); config @@ -215,6 +217,12 @@ public class MysqlDatabaseSync extends DatabaseSync { mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); } + @Override + public String getTableListPrefix() { + String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); + return databaseName; + } + /** * set chunkkeyColumn,eg: db.table1:column1,db.table2:column2 * @param sourceBuilder diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index 6e27eb6..92c1f95 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -174,7 +174,7 @@ public class OracleDatabaseSync extends DatabaseSync { .port(port) .databaseList(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .includeSchemaChanges(true) @@ -199,7 +199,7 @@ public class OracleDatabaseSync extends DatabaseSync { .password(password) .database(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .debeziumProperties(debeziumProperties) .startupOptions(startupOptions) .deserializer(schema) @@ -207,4 +207,10 @@ public class OracleDatabaseSync extends DatabaseSync { return env.addSource(oracleSource, "Oracle Source"); } } + + @Override + public String getTableListPrefix() { + String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME); + return schemaName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index b8c9ad1..04b4127 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -159,7 +159,7 @@ public class PostgresDatabaseSync extends DatabaseSync { .port(port) .database(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .deserializer(schema) @@ -185,7 +185,7 @@ public class PostgresDatabaseSync extends DatabaseSync { .port(port) .database(databaseName) .schemaList(schemaName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .debeziumProperties(debeziumProperties) @@ -196,4 +196,10 @@ public class PostgresDatabaseSync extends DatabaseSync { return env.addSource(postgresSource, "Postgres Source"); } } + + @Override + public String getTableListPrefix() { + String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME); + return schemaName; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index fb25212..10db3c1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -153,7 +153,7 @@ public class SqlServerDatabaseSync extends DatabaseSync { .hostname(hostname) .port(port) .databaseList(databaseName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .startupOptions(startupOptions) @@ -175,7 +175,7 @@ public class SqlServerDatabaseSync extends DatabaseSync { .hostname(hostname) .port(port) .database(databaseName) - .tableList(schemaName + "." + tableName) + .tableList(tableName) .username(username) .password(password) .debeziumProperties(debeziumProperties) @@ -185,4 +185,10 @@ public class SqlServerDatabaseSync extends DatabaseSync { return env.addSource(sqlServerSource, "SqlServer Source"); } } + + @Override + public String getTableListPrefix() { + String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); + return schemaName; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index de549ef..32aedab 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -20,8 +20,9 @@ package org.apache.doris.flink.sink.writer; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - +import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -29,9 +30,8 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.Field; import org.apache.doris.flink.rest.models.Schema; - -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; +import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.BeforeClass; @@ -198,8 +198,10 @@ public class TestJsonDebeziumSchemaSerializer { String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\ [...] JsonNode recordRoot = objectMapper.readTree(record); + serializer.setSourceConnector("oracle"); List<String> ddlSQLList = serializer.extractDDLList(recordRoot); Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + serializer.setSourceConnector("mysql"); } @Test @@ -419,4 +421,27 @@ public class TestJsonDebeziumSchemaSerializer { dorisOptions.setTableIdentifier(tmp); } + @Test + @Ignore + public void testAutoCreateTable() throws Exception { + String record + = "{ \"source\":{ \"version\":\"1.9.7.Final\", \"connector\":\"oracle\", \"name\":\"oracle_logminer\", \"ts_ms\":1696945825065, \"snapshot\":\"true\", \"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\", \"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\", \"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null, \"ssn\":0, \"redo_thread\":null [...] + JsonNode recordRoot = objectMapper.readTree(record); + dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030") + .setTableIdentifier("") + .setUsername("root") + .setPassword("").build(); + serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build(); + serializer.setSourceConnector(SourceConnector.ORACLE.connectorName); + TableSchema tableSchema = serializer.extractCreateTableSchema(recordRoot); + Assert.assertEquals("TESTDB", tableSchema.getDatabase()); + Assert.assertEquals("PERSONS", tableSchema.getTable()); + Assert.assertArrayEquals(new String[]{"ID"}, tableSchema.getKeys().toArray()); + Assert.assertEquals(3, tableSchema.getFields().size()); + Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName()); + Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName()); + Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName()); + serializer.setSourceConnector(SourceConnector.MYSQL.connectorName); + } + } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java index daab90b..fb2c8d6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -17,7 +17,10 @@ package org.apache.doris.flink.tools.cdc; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.apache.flink.configuration.Configuration; +import org.junit.Assert; import org.junit.Test; + import java.util.Arrays; /** @@ -37,4 +40,17 @@ public class DatabaseSyncTest { databaseSync.multiToOneRulesParser(arr[0], arr[1]); }); } + + @Test + public void getSyncTableListTest() throws Exception{ + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.setSingleSink(false); + databaseSync.setIncludingTables("tbl_1|tbl_2"); + Configuration config = new Configuration(); + config.setString("database-name", "db"); + config.setString("table-name", "tbl.*"); + databaseSync.setConfig(config); + String syncTableList = databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2")); + Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org