This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 73636562 [fix][mongodb-cdc]fix replace decimal type error when meeting non decimal type (#448) 73636562 is described below commit 73636562f1ce21211b2e6c3e678d4481d0eff4de Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Fri Jul 26 16:12:40 2024 +0800 [fix][mongodb-cdc]fix replace decimal type error when meeting non decimal type (#448) --- .../flink/tools/cdc/mongodb/MongoDBSchema.java | 32 ++++++++++++++++------ .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 11 ++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java index 41752c5e..08ec4509 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -22,14 +22,18 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.Map; public class MongoDBSchema extends SourceSchema { + private static final Logger LOG = LoggerFactory.getLogger(MongoDBSchema.class); public MongoDBSchema( ArrayList<Document> sampleData, @@ -73,17 +77,27 @@ public class MongoDBSchema extends SourceSchema { int existingPrecision = existingPrecisionAndScale.f0; int existingScale = existingPrecisionAndScale.f1; - Tuple2<Integer, Integer> currentPrecisionAndScale = - MongoDBType.getDecimalPrecisionAndScale(newDorisType); - int currentPrecision = currentPrecisionAndScale.f0; - int currentScale = currentPrecisionAndScale.f1; + try { + Tuple2<Integer, Integer> currentPrecisionAndScale = + MongoDBType.getDecimalPrecisionAndScale(newDorisType); + int currentPrecision = currentPrecisionAndScale.f0; + int currentScale = currentPrecisionAndScale.f1; - int newScale = Math.max(existingScale, currentScale); - int newIntegerPartSize = - Math.max(existingPrecision - existingScale, currentPrecision - currentScale); - int newPrecision = newIntegerPartSize + newScale; + int newScale = Math.max(existingScale, currentScale); + int newIntegerPartSize = + Math.max( + existingPrecision - existingScale, currentPrecision - currentScale); + int newPrecision = newIntegerPartSize + newScale; - return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; + return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; + } catch (DorisRuntimeException e) { + LOG.warn( + "Replace decimal type of field:{} failed, the newly type is:{}, rollback to existing type:{}", + fieldName, + newDorisType, + existingField.getTypeString()); + return existingField.getTypeString(); + } } return newDorisType; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java index 57f7f470..75dadde8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java @@ -44,4 +44,15 @@ public class MongoDBSchemaTest { String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); assertEquals("DECIMAL(15,8)", d); } + + @Test + public void replaceDecimalTypeIfNeededWhenContainsNonDecimalType() throws Exception { + ArrayList<Document> documents = new ArrayList<>(); + documents.add(new Document("fields1", 1234567.666666)); + documents.add(new Document("fields1", 1234567)); + documents.add(new Document("fields1", 1234567.7777777)); + MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, "db_TEST", "test_table", ""); + String d = mongoDBSchema.replaceDecimalTypeIfNeeded("fields1", "DECIMALV3(12,8)"); + assertEquals("DECIMAL(15,8)", d); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org