This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e76775a6 [fix](cdc) Fix possible loss of precision in MongoCDC Decimal 
sampling. (#463)
e76775a6 is described below

commit e76775a682a802eb80a795e31d82a65cf9cc7161
Author: bingquanzhao <bingquan_z...@icloud.com>
AuthorDate: Thu Aug 8 17:27:04 2024 +0800

    [fix](cdc) Fix possible loss of precision in MongoCDC Decimal sampling. 
(#463)
---
 .../flink/tools/cdc/mongodb/MongoDBSchema.java     |  89 ++++++++++------
 .../doris/flink/tools/cdc/mongodb/MongoDBType.java |   8 +-
 .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 115 +++++++++++++++++++--
 3 files changed, 173 insertions(+), 39 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 08ec4509..984419bc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -22,18 +22,31 @@ import org.apache.flink.api.java.tuple.Tuple2;
 
 import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 public class MongoDBSchema extends SourceSchema {
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBSchema.class);
+    private static final List<String> CONVERT_TYPE =
+            Arrays.asList(DorisType.BIGINT, DorisType.INT, DorisType.SMALLINT, 
DorisType.TINYINT);
+
+    public enum DecimalJudgement {
+        NOT_DECIMAL,
+        CERTAIN_DECIMAL,
+        CONVERT_TO_DECIMAL;
+
+        public static boolean needProcessing(DecimalJudgement 
decimalJudgement) {
+            return !decimalJudgement.equals(NOT_DECIMAL);
+        }
+    }
 
     public MongoDBSchema(
             ArrayList<Document> sampleData,
@@ -51,21 +64,50 @@ public class MongoDBSchema extends SourceSchema {
         primaryKeys.add("_id");
     }
 
-    private void processSampleData(Document sampleData) {
+    @VisibleForTesting
+    protected void processSampleData(Document sampleData) {
         for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
             String fieldName = entry.getKey();
             Object value = entry.getValue();
-            String dorisType = MongoDBType.toDorisType(value);
-            if (isDecimalField(fieldName)) {
-                dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
-            }
+            String dorisType = determineDorisType(fieldName, value);
             fields.put(fieldName, new FieldSchema(fieldName, dorisType, null));
         }
     }
 
-    private boolean isDecimalField(String fieldName) {
+    private String determineDorisType(String fieldName, Object value) {
+        String dorisType = MongoDBType.toDorisType(value);
+        // Check if the type is string or if the existing field is a string 
type
+        FieldSchema existingField = fields.get(fieldName);
+        if (dorisType.equals(DorisType.STRING)
+                || (existingField != null
+                        && 
existingField.getTypeString().equals(DorisType.STRING))) {
+            return DorisType.STRING;
+        }
+        // Check and process for decimal types
+        DecimalJudgement decimalJudgement = judgeDecimalField(fieldName, 
dorisType);
+        if (DecimalJudgement.needProcessing(decimalJudgement)) {
+            if (decimalJudgement == DecimalJudgement.CONVERT_TO_DECIMAL) {
+                int precision = value.toString().length();
+                dorisType = MongoDBType.formatDecimalType(precision, 0);
+            }
+            dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
+        }
+        return dorisType;
+    }
+
+    private DecimalJudgement judgeDecimalField(String fieldName, String 
dorisType) {
         FieldSchema existingField = fields.get(fieldName);
-        return existingField != null && 
existingField.getTypeString().startsWith(DorisType.DECIMAL);
+        if (existingField == null) {
+            return DecimalJudgement.NOT_DECIMAL;
+        }
+        boolean existDecimal = 
existingField.getTypeString().startsWith(DorisType.DECIMAL);
+        boolean isDecimal = dorisType.startsWith(DorisType.DECIMAL);
+        if (existDecimal && isDecimal) {
+            return DecimalJudgement.CERTAIN_DECIMAL;
+        } else if (CONVERT_TYPE.contains(dorisType)) {
+            return DecimalJudgement.CONVERT_TO_DECIMAL;
+        }
+        return DecimalJudgement.NOT_DECIMAL;
     }
 
     @VisibleForTesting
@@ -76,28 +118,17 @@ public class MongoDBSchema extends SourceSchema {
                     
MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString());
             int existingPrecision = existingPrecisionAndScale.f0;
             int existingScale = existingPrecisionAndScale.f1;
+            Tuple2<Integer, Integer> currentPrecisionAndScale =
+                    MongoDBType.getDecimalPrecisionAndScale(newDorisType);
+            int currentPrecision = currentPrecisionAndScale.f0;
+            int currentScale = currentPrecisionAndScale.f1;
 
-            try {
-                Tuple2<Integer, Integer> currentPrecisionAndScale =
-                        MongoDBType.getDecimalPrecisionAndScale(newDorisType);
-                int currentPrecision = currentPrecisionAndScale.f0;
-                int currentScale = currentPrecisionAndScale.f1;
-
-                int newScale = Math.max(existingScale, currentScale);
-                int newIntegerPartSize =
-                        Math.max(
-                                existingPrecision - existingScale, 
currentPrecision - currentScale);
-                int newPrecision = newIntegerPartSize + newScale;
-
-                return DorisType.DECIMAL + "(" + newPrecision + "," + newScale 
+ ")";
-            } catch (DorisRuntimeException e) {
-                LOG.warn(
-                        "Replace decimal type of field:{} failed, the newly 
type is:{}, rollback to existing type:{}",
-                        fieldName,
-                        newDorisType,
-                        existingField.getTypeString());
-                return existingField.getTypeString();
-            }
+            int newScale = Math.max(existingScale, currentScale);
+            int newIntegerPartSize =
+                    Math.max(existingPrecision - existingScale, 
currentPrecision - currentScale);
+            int newPrecision = newIntegerPartSize + newScale;
+
+            return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + 
")";
         }
         return newDorisType;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
index bee85ced..578a407c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
@@ -118,9 +118,11 @@ public class MongoDBType {
             decimal = new BigDecimal(decimal.toPlainString());
         }
         return decimal.precision() <= 38
-                ? String.format(
-                        "%s(%s,%s)",
-                        DorisType.DECIMAL_V3, decimal.precision(), 
Math.max(decimal.scale(), 0))
+                ? formatDecimalType(decimal.precision(), 
Math.max(decimal.scale(), 0))
                 : DorisType.STRING;
     }
+
+    public static String formatDecimalType(int precision, int scale) {
+        return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, 
scale);
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index 75dadde8..a9cc8bbc 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.doris.flink.tools.cdc.mongodb;
 
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.bson.Document;
+import org.bson.types.Decimal128;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -37,22 +41,119 @@ public class MongoDBSchemaTest {
     }
 
     @Test
-    public void replaceDecimalTypeIfNeeded() throws Exception {
+    public void replaceDecimalTypeIfNeededTest1() throws Exception {
         ArrayList<Document> documents = new ArrayList<>();
         documents.add(new Document("fields1", 1234567.666666));
+        documents.add(new Document("fields1", 123456789.88888888));
+
         MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
-        String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", 
"DECIMALV3(12,8)");
-        assertEquals("DECIMAL(15,8)", d);
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            FieldSchema fieldSchema = entry.getValue();
+            String fieldName = entry.getKey();
+            if (fieldName.equals("fields1")) {
+                assertEquals("DECIMAL(17,8)", fieldSchema.getTypeString());
+            }
+        }
     }
 
     @Test
-    public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws 
Exception {
+    public void replaceDecimalTypeIfNeededTest2() throws Exception {
         ArrayList<Document> documents = new ArrayList<>();
         documents.add(new Document("fields1", 1234567.666666));
-        documents.add(new Document("fields1", 1234567));
+        documents.add(new Document("fields1", 123456789));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            FieldSchema fieldSchema = entry.getValue();
+            String fieldName = entry.getKey();
+            if (fieldName.equals("fields1")) {
+                assertEquals("DECIMAL(15,6)", fieldSchema.getTypeString());
+            }
+        }
+    }
+
+    @Test
+    public void replaceDecimalTypeIfNeededTest3() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", 1234567.666666));
+        documents.add(new Document("fields1", 123456789));
         documents.add(new Document("fields1", 1234567.7777777));
+        documents.add(
+                new Document("fields1", new Decimal128(new 
BigDecimal("12345679012.999999999"))));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            FieldSchema fieldSchema = entry.getValue();
+            String fieldName = entry.getKey();
+            if (fieldName.equals("fields1")) {
+                assertEquals("DECIMAL(20,9)", fieldSchema.getTypeString());
+            }
+        }
+    }
+
+    @Test
+    public void replaceDecimalTypeIfNeededTest4() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", "yes"));
+        documents.add(new Document("fields1", 1234567.666666));
+        documents.add(new Document("fields1", 123456789));
+        documents.add(new Document("fields1", 1234567.7777777));
+        documents.add(
+                new Document("fields1", new Decimal128(new 
BigDecimal("12345679012.999999999"))));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            FieldSchema fieldSchema = entry.getValue();
+            String fieldName = entry.getKey();
+            if (fieldName.equals("fields1")) {
+                assertEquals("STRING", fieldSchema.getTypeString());
+            }
+        }
+    }
+
+    @Test
+    public void replaceDecimalTypeIfNeededTest5() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", 1234567.666666));
+        documents.add(new Document("fields1", 123456789));
+        documents.add(new Document("fields1", 1234567.7777777));
+        documents.add(new Document("fields1", "yes"));
+        documents.add(
+                new Document("fields1", new Decimal128(new 
BigDecimal("12345679012.999999999"))));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            FieldSchema fieldSchema = entry.getValue();
+            String fieldName = entry.getKey();
+            if (fieldName.equals("fields1")) {
+                assertEquals("STRING", fieldSchema.getTypeString());
+            }
+        }
+    }
+
+    @Test
+    public void replaceDecimalTypeIfNeededTest6() throws Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", 1234567.666666));
+        documents.add(new Document("fields1", 123456789));
+        documents.add(new Document("fields1", 1234567.7777777));
+        documents.add(new Document("fields1", 123444555433445L));
+        documents.add(
+                new Document("fields1", new Decimal128(new 
BigDecimal("12345679012.999999999"))));
+
         MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
-        String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", 
"DECIMALV3(12,8)");
-        assertEquals("DECIMAL(15,8)", d);
+        Map<String, FieldSchema> fields = mongoDBSchema.getFields();
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            FieldSchema fieldSchema = entry.getValue();
+            String fieldName = entry.getKey();
+            if (fieldName.equals("fields1")) {
+                assertEquals("DECIMAL(24,9)", fieldSchema.getTypeString());
+            }
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to