JNSimba commented on code in PR #248: URL: https://github.com/apache/doris-flink-connector/pull/248#discussion_r1410193059
########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java: ########## @@ -214,44 +216,70 @@ public boolean schemaChangeV2(JsonNode recordRoot) { return false; } - // db,table - Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); - if(tuple == null){ - return false; - } - - List<String> ddlSqlList = extractDDLList(recordRoot); - if (CollectionUtils.isEmpty(ddlSqlList)) { - LOG.info("ddl can not do schema change:{}", recordRoot); + EventType eventType = extractEventType(recordRoot); + if(eventType == null){ return false; } - - List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); - status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + if(eventType.equals(EventType.CREATE)){ + TableSchema tableSchema = extractCreateTableSchema(recordRoot); + status = schemaChangeManager.createTable(tableSchema); + if(status){ + String cdcTbl = getCdcTableIdentifier(recordRoot); + String dorisTbl = getCreateTableIdentifier(recordRoot); + tableMapping.put(cdcTbl, dorisTbl); + LOG.info("create table ddl status: {}", status); + } + } else if (eventType.equals(EventType.ALTER)){ + // db,table + Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); + if(tuple == null){ + return false; + } + List<String> ddlSqlList = extractDDLList(recordRoot); + if (CollectionUtils.isEmpty(ddlSqlList)) { + LOG.info("ddl can not do schema change:{}", recordRoot); + return false; + } + List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); + for (int i = 0; i < ddlSqlList.size(); i++) { + DDLSchema ddlSchema = ddlSchemas.get(i); + String ddlSql = ddlSqlList.get(i); + boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); + status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); + LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + } + } else{ + LOG.info("Unsupported event type {}", eventType); } } catch (Exception ex) { LOG.warn("schema change error :", ex); } return status; } + protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = extractHistoryRecord(record); + JsonNode tableChanges = historyRecord.get("tableChanges"); + if(!Objects.isNull(tableChanges)){ + JsonNode tableChange = tableChanges.get(0); + return tableChange; + } + return null; + } + + /** + * Parse Alter Event + */ @VisibleForTesting - public List<String> extractDDLList(JsonNode record) throws JsonProcessingException { + public List<String> extractDDLList(JsonNode record) throws IOException{ String dorisTable = getDorisTableIdentifier(record); JsonNode historyRecord = extractHistoryRecord(record); - JsonNode tableChanges = historyRecord.get("tableChanges"); String ddl = extractJsonNode(historyRecord, "ddl"); - if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) { - return new ArrayList<>(); + JsonNode tableChange = extractTableChange(record); + if (Objects.isNull(tableChange) || Objects.isNull(ddl)) { + return null; } - LOG.debug("received debezium ddl :{}", ddl); - JsonNode tableChange = tableChanges.get(0); - if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) { + if(!EventType.ALTER.equals(extractEventType(record))){ Review Comment: Yes, removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org