This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch release-1.2.1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/release-1.2.1 by this push: new 5e26635 [improve] Schema change does not require db and tbl consistency (#87) (#89) 5e26635 is described below commit 5e2663502511dba55f3b761ef79eeff44f537762 Author: wudi <676366...@qq.com> AuthorDate: Wed Nov 30 11:31:36 2022 +0800 [improve] Schema change does not require db and tbl consistency (#87) (#89) * Optimizing schema changes --- .../sink/writer/JsonDebeziumSchemaSerializer.java | 89 +++++++++++++++------- .../writer/TestJsonDebeziumSchemaSerializer.java | 4 +- 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index c3fe987..bd685ef 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -25,6 +25,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.HttpGetWithEntity; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.StringUtils; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -56,14 +57,23 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private static final String OP_UPDATE = "u"; // update private static final String OP_DELETE = "d"; // delete - private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+COLUMN\\s+([^\\s]+).*"; + public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int + private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; private final Pattern addDropDDLPattern; private DorisOptions dorisOptions; private ObjectMapper objectMapper = new ObjectMapper(); + private String database; + private String table; + //table name of the cdc upstream, format is db.tbl + private String sourceTableName; - public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern) { + public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) { this.dorisOptions = dorisOptions; this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern; + String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); + this.database = tableInfo[0]; + this.table = tableInfo[1]; + this.sourceTableName = sourceTableName; } @Override @@ -97,8 +107,16 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin public boolean schemaChange(JsonNode recordRoot) { boolean status = false; try{ - boolean doSchemaChange = checkSchemaChange(recordRoot); - status = doSchemaChange && execSchemaChange(recordRoot); + if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){ + return false; + } + String ddl = extractDDL(recordRoot); + if(StringUtils.isNullOrWhitespaceOnly(ddl)){ + LOG.info("ddl can not do schema change:{}", recordRoot); + return false; + } + boolean doSchemaChange = checkSchemaChange(ddl); + status = doSchemaChange && execSchemaChange(ddl); LOG.info("schema change status:{}", status); }catch (Exception ex){ LOG.warn("schema change error :", ex); @@ -106,6 +124,16 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return status; } + /** + * When cdc synchronizes multiple tables, it will capture multiple table schema changes + */ + protected boolean checkTable(JsonNode recordRoot) { + String db = extractDatabase(recordRoot); + String tbl = extractTable(recordRoot); + String dbTbl = db + "." + tbl; + return sourceTableName.equals(dbTbl); + } + private void addDeleteSign(Map<String, String> valueMap, boolean delete) { if(delete){ valueMap.put(DORIS_DELETE_SIGN, "1"); @@ -114,11 +142,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } } - private boolean checkSchemaChange(JsonNode record) throws IOException { - String database = extractDatabase(record); - String table = extractTable(record); + private boolean checkSchemaChange(String ddl) throws IOException { String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database, table); - Map<String,Object> param = buildRequestParam(record); + Map<String,Object> param = buildRequestParam(ddl); if(param.size() != 2){ return false; } @@ -139,27 +165,21 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin * "columnName" : "column" * } */ - private Map<String, Object> buildRequestParam(JsonNode record) throws JsonProcessingException { + protected Map<String, Object> buildRequestParam(String ddl) { Map<String,Object> params = new HashMap<>(); - String ddl = extractDDL(record); - if(ddl == null){ - return params; - } Matcher matcher = addDropDDLPattern.matcher(ddl); if(matcher.find()){ String op = matcher.group(1); - String col = matcher.group(2); + String col = matcher.group(3); params.put("isDropColumn", op.equalsIgnoreCase("DROP")); params.put("columnName", col); } return params; } - private boolean execSchemaChange(JsonNode record) throws IOException { - String extractDDL = extractDDL(record); + private boolean execSchemaChange(String ddl) throws IOException { Map<String, String> param = new HashMap<>(); - param.put("stmt", extractDDL); - String database = extractDatabase(record); + param.put("stmt", ddl); String requestUrl = String.format(SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); @@ -169,15 +189,20 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return success; } - private String extractDatabase(JsonNode record) { - return extractJsonNode(record.get("source"), "db"); + protected String extractDatabase(JsonNode record) { + if(record.get("source").has("schema")){ + //compatible with schema + return extractJsonNode(record.get("source"), "schema"); + }else{ + return extractJsonNode(record.get("source"), "db"); + } } - private String extractTable(JsonNode record) { + protected String extractTable(JsonNode record) { return extractJsonNode(record.get("source"), "table"); } - private boolean handleResponse(HttpUriRequest request) throws IOException { + private boolean handleResponse(HttpUriRequest request) { try (CloseableHttpClient httpclient = HttpClients.createDefault()) { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); @@ -215,20 +240,26 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return recordMap != null ? recordMap : new HashMap<>(); } - @VisibleForTesting public String extractDDL(JsonNode record) throws JsonProcessingException { String historyRecord = extractJsonNode(record, "historyRecord"); if (Objects.isNull(historyRecord)) { return null; } String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl"); + LOG.debug("received debezium ddl :{}", ddl); if (!Objects.isNull(ddl)) { //filter add/drop operation - if (addDropDDLPattern.matcher(ddl).matches()) { + Matcher matcher = addDropDDLPattern.matcher(ddl); + if(matcher.find()){ + String op = matcher.group(1); + String col = matcher.group(3); + String type = matcher.group(5); + type = type == null ? "" : type; + ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type); + LOG.info("parse ddl:{}", ddl); return ddl; } } - LOG.info("parse ddl:{}", ddl); return null; } @@ -246,6 +277,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin public static class Builder { private DorisOptions dorisOptions; private Pattern addDropDDLPattern; + private String sourceTableName; public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; @@ -257,8 +289,13 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return this; } + public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) { + this.sourceTableName = sourceTableName; + return this; + } + public JsonDebeziumSchemaSerializer build() { - return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern); + return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName); } } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index 0d35ae3..ed5c37f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -100,8 +100,8 @@ public class TestJsonDebeziumSchemaSerializer { @Test public void testExtractDDL() throws IOException { - String srcDDL = "alter table t1 add \n column c_1 varchar(200)"; - String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(200)"; + String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] JsonNode recordRoot = objectMapper.readTree(record); String ddl = serializer.extractDDL(recordRoot); Assert.assertEquals(srcDDL, ddl); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org