This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 73636562 [fix][mongodb-cdc]fix replace decimal type error when meeting 
non decimal type (#448)
73636562 is described below

commit 73636562f1ce21211b2e6c3e678d4481d0eff4de
Author: North Lin <37775475+qg-...@users.noreply.github.com>
AuthorDate: Fri Jul 26 16:12:40 2024 +0800

    [fix][mongodb-cdc]fix replace decimal type error when meeting non decimal 
type (#448)
---
 .../flink/tools/cdc/mongodb/MongoDBSchema.java     | 32 ++++++++++++++++------
 .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 11 ++++++++
 2 files changed, 34 insertions(+), 9 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 41752c5e..08ec4509 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -22,14 +22,18 @@ import org.apache.flink.api.java.tuple.Tuple2;
 
 import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class MongoDBSchema extends SourceSchema {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBSchema.class);
 
     public MongoDBSchema(
             ArrayList<Document> sampleData,
@@ -73,17 +77,27 @@ public class MongoDBSchema extends SourceSchema {
             int existingPrecision = existingPrecisionAndScale.f0;
             int existingScale = existingPrecisionAndScale.f1;
 
-            Tuple2<Integer, Integer> currentPrecisionAndScale =
-                    MongoDBType.getDecimalPrecisionAndScale(newDorisType);
-            int currentPrecision = currentPrecisionAndScale.f0;
-            int currentScale = currentPrecisionAndScale.f1;
+            try {
+                Tuple2<Integer, Integer> currentPrecisionAndScale =
+                        MongoDBType.getDecimalPrecisionAndScale(newDorisType);
+                int currentPrecision = currentPrecisionAndScale.f0;
+                int currentScale = currentPrecisionAndScale.f1;
 
-            int newScale = Math.max(existingScale, currentScale);
-            int newIntegerPartSize =
-                    Math.max(existingPrecision - existingScale, 
currentPrecision - currentScale);
-            int newPrecision = newIntegerPartSize + newScale;
+                int newScale = Math.max(existingScale, currentScale);
+                int newIntegerPartSize =
+                        Math.max(
+                                existingPrecision - existingScale, 
currentPrecision - currentScale);
+                int newPrecision = newIntegerPartSize + newScale;
 
-            return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + 
")";
+                return DorisType.DECIMAL + "(" + newPrecision + "," + newScale 
+ ")";
+            } catch (DorisRuntimeException e) {
+                LOG.warn(
+                        "Replace decimal type of field:{} failed, the newly 
type is:{}, rollback to existing type:{}",
+                        fieldName,
+                        newDorisType,
+                        existingField.getTypeString());
+                return existingField.getTypeString();
+            }
         }
         return newDorisType;
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index 57f7f470..75dadde8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -44,4 +44,15 @@ public class MongoDBSchemaTest {
         String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", 
"DECIMALV3(12,8)");
         assertEquals("DECIMAL(15,8)", d);
     }
+
+    @Test
+    public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws 
Exception {
+        ArrayList<Document> documents = new ArrayList<>();
+        documents.add(new Document("fields1", 1234567.666666));
+        documents.add(new Document("fields1", 1234567));
+        documents.add(new Document("fields1", 1234567.7777777));
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", 
"test_table", "");
+        String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", 
"DECIMALV3(12,8)");
+        assertEquals("DECIMAL(15,8)", d);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to