This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new ef4feeb [Bug] fix can not create table problem (#252) ef4feeb is described below commit ef4feeb17aabf007c6ba763d8a76f33a26baa7a3 Author: wudi <676366...@qq.com> AuthorDate: Thu Nov 30 19:32:58 2023 +0800 [Bug] fix can not create table problem (#252) Co-authored-by: wudi <> --- .../serializer/JsonDebeziumSchemaSerializer.java | 52 ++++++++++++++++------ .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 + .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 4 +- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 29d9cee..25d06ef 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -93,6 +93,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private SchemaChangeManager schemaChangeManager; // <cdc db.schema.table, doris db.table> private Map<String, String> tableMapping; + // create table properties + private Map<String, String> tableProperties; + private String targetDatabase; public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, @@ -134,24 +137,19 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin String sourceTableName, boolean newSchemaChange, DorisExecutionOptions executionOptions, - Map<String, String> tableMapping) { + Map<String, String> tableMapping, + Map<String, String> tableProperties, + String targetDatabase) { this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions); this.tableMapping = tableMapping; + this.tableProperties = tableProperties; + this.targetDatabase = targetDatabase; } @Override public DorisRecord serialize(String record) throws IOException { LOG.debug("received debezium json data {} :", record); JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); - - //Filter out table records that are not in tableMapping - String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); - String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier); - if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){ - LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record); - return null; - } - String op = extractJsonNode(recordRoot, "op"); if (Objects.isNull(op)) { // schema change ddl @@ -166,6 +164,15 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin if (newSchemaChange && firstLoad) { initOriginFieldSchema(recordRoot); } + + //Filter out table records that are not in tableMapping + String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); + String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier); + if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){ + LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record); + return null; + } + Map<String, Object> valueMap; switch (op) { case OP_READ: @@ -313,11 +320,16 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin @VisibleForTesting public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException { + if(sourceConnector == null){ + sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase()); + } + String dorisTable = getCreateTableIdentifier(record); JsonNode tableChange = extractTableChange(record); JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames"); JsonNode columns = tableChange.get("table").get("columns"); - String tblComment = tableChange.get("table").get("comment").asText(); + JsonNode comment = tableChange.get("table").get("comment"); + String tblComment = comment == null ? "" : comment.asText(); Map<String, FieldSchema> field = new LinkedHashMap<>(); for (JsonNode column : columns) { buildFieldSchema(field, column); @@ -333,6 +345,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin tableSchema.setKeys(pkList); tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field)); tableSchema.setTableComment(tblComment); + tableSchema.setProperties(tableProperties); String[] split = dorisTable.split("\\."); Preconditions.checkArgument(split.length == 2); @@ -402,9 +415,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } public String getCreateTableIdentifier(JsonNode record){ - String db = extractJsonNode(record.get("source"), "db"); String table = extractJsonNode(record.get("source"), "table"); - return db + "." + table; + return targetDatabase + "." + table; } public String getDorisTableIdentifier(String cdcTableIdentifier){ @@ -657,6 +669,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private boolean newSchemaChange; private DorisExecutionOptions executionOptions; private Map<String, String> tableMapping; + private Map<String, String> tableProperties; + private String targetDatabase; public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; @@ -688,9 +702,19 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return this; } + public Builder setTableProperties(Map<String, String> tableProperties) { + this.tableProperties = tableProperties; + return this; + } + + public Builder setTargetDatabase(String targetDatabase) { + this.targetDatabase = targetDatabase; + return this; + } + public JsonDebeziumSchemaSerializer build() { return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange, - executionOptions, tableMapping); + executionOptions, tableMapping, tableProperties, targetDatabase); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 02ab034..fe5357e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -242,6 +242,8 @@ public abstract class DatabaseSync { .setNewSchemaChange(newSchemaChange) .setExecutionOptions(executionOptions) .setTableMapping(tableMapping) + .setTableProperties(tableConfig) + .setTargetDatabase(database) .build()) .setDorisOptions(dorisBuilder.build()); return builder.build(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index a3e01d3..a86b2ab 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -122,9 +122,7 @@ public class MysqlDatabaseSync extends DatabaseSync { .username(config.get(MySqlSourceOptions.USERNAME)) .password(config.get(MySqlSourceOptions.PASSWORD)) .databaseList(databaseName) - .tableList(tableName) - //default open add newly table - .scanNewlyAddedTableEnabled(true); + .tableList(tableName); config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId); config --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org