This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new e76775a6 [fix](cdc) Fix possible loss of precision in MongoCDC Decimal sampling. (#463) e76775a6 is described below commit e76775a682a802eb80a795e31d82a65cf9cc7161 Author: bingquanzhao <bingquan_z...@icloud.com> AuthorDate: Thu Aug 8 17:27:04 2024 +0800 [fix](cdc) Fix possible loss of precision in MongoCDC Decimal sampling. (#463) --- .../flink/tools/cdc/mongodb/MongoDBSchema.java | 89 ++++++++++------ .../doris/flink/tools/cdc/mongodb/MongoDBType.java | 8 +- .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 115 +++++++++++++++++++-- 3 files changed, 173 insertions(+), 39 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java index 08ec4509..984419bc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -22,18 +22,31 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; -import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; public class MongoDBSchema extends SourceSchema { private static final Logger LOG = LoggerFactory.getLogger(MongoDBSchema.class); + private static final List<String> CONVERT_TYPE = + Arrays.asList(DorisType.BIGINT, DorisType.INT, DorisType.SMALLINT, DorisType.TINYINT); + + public enum DecimalJudgement { + NOT_DECIMAL, + CERTAIN_DECIMAL, + CONVERT_TO_DECIMAL; + + public static boolean needProcessing(DecimalJudgement decimalJudgement) { + return !decimalJudgement.equals(NOT_DECIMAL); + } + } public MongoDBSchema( ArrayList<Document> sampleData, @@ -51,21 +64,50 @@ public class MongoDBSchema extends SourceSchema { primaryKeys.add("_id"); } - private void processSampleData(Document sampleData) { + @VisibleForTesting + protected void processSampleData(Document sampleData) { for (Map.Entry<String, Object> entry : sampleData.entrySet()) { String fieldName = entry.getKey(); Object value = entry.getValue(); - String dorisType = MongoDBType.toDorisType(value); - if (isDecimalField(fieldName)) { - dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType); - } + String dorisType = determineDorisType(fieldName, value); fields.put(fieldName, new FieldSchema(fieldName, dorisType, null)); } } - private boolean isDecimalField(String fieldName) { + private String determineDorisType(String fieldName, Object value) { + String dorisType = MongoDBType.toDorisType(value); + // Check if the type is string or if the existing field is a string type + FieldSchema existingField = fields.get(fieldName); + if (dorisType.equals(DorisType.STRING) + || (existingField != null + && existingField.getTypeString().equals(DorisType.STRING))) { + return DorisType.STRING; + } + // Check and process for decimal types + DecimalJudgement decimalJudgement = judgeDecimalField(fieldName, dorisType); + if (DecimalJudgement.needProcessing(decimalJudgement)) { + if (decimalJudgement == DecimalJudgement.CONVERT_TO_DECIMAL) { + int precision = value.toString().length(); + dorisType = MongoDBType.formatDecimalType(precision, 0); + } + dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType); + } + return dorisType; + } + + private DecimalJudgement judgeDecimalField(String fieldName, String dorisType) { FieldSchema existingField = fields.get(fieldName); - return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL); + if (existingField == null) { + return DecimalJudgement.NOT_DECIMAL; + } + boolean existDecimal = existingField.getTypeString().startsWith(DorisType.DECIMAL); + boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL); + if (existDecimal && isDecimal) { + return DecimalJudgement.CERTAIN_DECIMAL; + } else if (CONVERT_TYPE.contains(dorisType)) { + return DecimalJudgement.CONVERT_TO_DECIMAL; + } + return DecimalJudgement.NOT_DECIMAL; } @VisibleForTesting @@ -76,28 +118,17 @@ public class MongoDBSchema extends SourceSchema { MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString()); int existingPrecision = existingPrecisionAndScale.f0; int existingScale = existingPrecisionAndScale.f1; + Tuple2<Integer, Integer> currentPrecisionAndScale = + MongoDBType.getDecimalPrecisionAndScale(newDorisType); + int currentPrecision = currentPrecisionAndScale.f0; + int currentScale = currentPrecisionAndScale.f1; - try { - Tuple2<Integer, Integer> currentPrecisionAndScale = - MongoDBType.getDecimalPrecisionAndScale(newDorisType); - int currentPrecision = currentPrecisionAndScale.f0; - int currentScale = currentPrecisionAndScale.f1; - - int newScale = Math.max(existingScale, currentScale); - int newIntegerPartSize = - Math.max( - existingPrecision - existingScale, currentPrecision - currentScale); - int newPrecision = newIntegerPartSize + newScale; - - return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; - } catch (DorisRuntimeException e) { - LOG.warn( - "Replace decimal type of field:{} failed, the newly type is:{}, rollback to existing type:{}", - fieldName, - newDorisType, - existingField.getTypeString()); - return existingField.getTypeString(); - } + int newScale = Math.max(existingScale, currentScale); + int newIntegerPartSize = + Math.max(existingPrecision - existingScale, currentPrecision - currentScale); + int newPrecision = newIntegerPartSize + newScale; + + return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; } return newDorisType; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java index bee85ced..578a407c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java @@ -118,9 +118,11 @@ public class MongoDBType { decimal = new BigDecimal(decimal.toPlainString()); } return decimal.precision() <= 38 - ? String.format( - "%s(%s,%s)", - DorisType.DECIMAL_V3, decimal.precision(), Math.max(decimal.scale(), 0)) + ? formatDecimalType(decimal.precision(), Math.max(decimal.scale(), 0)) : DorisType.STRING; } + + public static String formatDecimalType(int precision, int scale) { + return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java index 75dadde8..a9cc8bbc 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java @@ -17,10 +17,14 @@ package org.apache.doris.flink.tools.cdc.mongodb; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.bson.Document; +import org.bson.types.Decimal128; import org.junit.Test; +import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -37,22 +41,119 @@ public class MongoDBSchemaTest { } @Test - public void replaceDecimalTypeIfNeeded() throws Exception { + public void replaceDecimalTypeIfNeededTest1() throws Exception { ArrayList<Document> documents = new ArrayList<>(); documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789.88888888)); + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); - String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); - assertEquals("DECIMAL(15,8)", d); + Map<String, FieldSchema> fields = mongoDBSchema.getFields(); + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(17,8)", fieldSchema.getTypeString()); + } + } } @Test - public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws Exception { + public void replaceDecimalTypeIfNeededTest2() throws Exception { ArrayList<Document> documents = new ArrayList<>(); documents.add(new Document("fields1", 1234567.666666)); - documents.add(new Document("fields1", 1234567)); + documents.add(new Document("fields1", 123456789)); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map<String, FieldSchema> fields = mongoDBSchema.getFields(); + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(15,6)", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest3() throws Exception { + ArrayList<Document> documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); documents.add(new Document("fields1", 1234567.7777777)); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map<String, FieldSchema> fields = mongoDBSchema.getFields(); + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(20,9)", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest4() throws Exception { + ArrayList<Document> documents = new ArrayList<>(); + documents.add(new Document("fields1", "yes")); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); + documents.add(new Document("fields1", 1234567.7777777)); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map<String, FieldSchema> fields = mongoDBSchema.getFields(); + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("STRING", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest5() throws Exception { + ArrayList<Document> documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); + documents.add(new Document("fields1", 1234567.7777777)); + documents.add(new Document("fields1", "yes")); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + Map<String, FieldSchema> fields = mongoDBSchema.getFields(); + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("STRING", fieldSchema.getTypeString()); + } + } + } + + @Test + public void replaceDecimalTypeIfNeededTest6() throws Exception { + ArrayList<Document> documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 123456789)); + documents.add(new Document("fields1", 1234567.7777777)); + documents.add(new Document("fields1", 123444555433445L)); + documents.add( + new Document("fields1", new Decimal128(new BigDecimal("12345679012.999999999")))); + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); - String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); - assertEquals("DECIMAL(15,8)", d); + Map<String, FieldSchema> fields = mongoDBSchema.getFields(); + for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) { + FieldSchema fieldSchema = entry.getValue(); + String fieldName = entry.getKey(); + if (fieldName.equals("fields1")) { + assertEquals("DECIMAL(24,9)", fieldSchema.getTypeString()); + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org