This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 067eaf12 [improve](cdc) Optimize MongoCDC sampleSize calculation logic 
(#542)
067eaf12 is described below

commit 067eaf12fda5a31db0f6edd1ea1a33225ccbb870
Author: Qinghuang Xu <781240...@qq.com>
AuthorDate: Mon Jan 13 10:04:06 2025 +0800

    [improve](cdc) Optimize MongoCDC sampleSize calculation logic (#542)
---
 .../tools/cdc/mongodb/MongoDBDatabaseSync.java     | 32 ++++++++++++++-
 .../tools/cdc/mongodb/MongoDBDatabaseSyncTest.java | 48 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 1 deletion(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 3526c075..ca034ac3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -68,6 +68,18 @@ public class MongoDBDatabaseSync extends DatabaseSync {
                     .defaultValue(0.2)
                     .withDescription("mongo cdc sample percent");
 
+    public static final ConfigOption<Long> MONGO_CDC_MIN_SAMPLE_SIZE =
+            ConfigOptions.key("schema.min-sample-size")
+                    .longType()
+                    .defaultValue(1000L)
+                    .withDescription("mongo cdc min sample size");
+
+    public static final ConfigOption<Long> MONGO_CDC_MAX_SAMPLE_SIZE =
+            ConfigOptions.key("schema.max-sample-size")
+                    .longType()
+                    .defaultValue(100000L)
+                    .withDescription("mongo cdc max sample size");
+
     public static final ConfigOption<String> TABLE_NAME =
             ConfigOptions.key("table-name")
                     .stringType()
@@ -101,6 +113,8 @@ public class MongoDBDatabaseSync extends DatabaseSync {
 
         MongoClientSettings settings = settingsBuilder.build();
         Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
+        Long minSampleSize = config.get(MONGO_CDC_MIN_SAMPLE_SIZE);
+        Long maxSampleSize = config.get(MONGO_CDC_MAX_SAMPLE_SIZE);
         try (MongoClient mongoClient = MongoClients.create(settings)) {
             MongoDatabase mongoDatabase = 
mongoClient.getDatabase(databaseName);
             MongoIterable<String> collectionNames = 
mongoDatabase.listCollectionNames();
@@ -115,7 +129,10 @@ public class MongoDBDatabaseSync extends DatabaseSync {
                 }
 
                 long totalDocuments = collection.estimatedDocumentCount();
-                long sampleSize = (long) Math.ceil(totalDocuments * 
samplePercent);
+                long sampleSize =
+                        calculateSampleSize(
+                                totalDocuments, samplePercent, minSampleSize, 
maxSampleSize);
+
                 ArrayList<Document> documents = sampleData(collection, 
sampleSize);
                 MongoDBSchema mongoDBSchema =
                         new MongoDBSchema(documents, databaseName, 
collectionName, null);
@@ -127,6 +144,19 @@ public class MongoDBDatabaseSync extends DatabaseSync {
         return schemaList;
     }
 
+    public long calculateSampleSize(
+            long totalDocuments, Double samplePercent, Long minSampleSize, 
Long maxSampleSize) {
+        if (totalDocuments < minSampleSize) {
+            return totalDocuments;
+        }
+        long sampleSize = (long) Math.ceil(totalDocuments * samplePercent);
+        // If the number of samples is less than the minimum threshold, the 
minimum threshold is
+        // used, while ensuring that the number of samples does not exceed the 
maximum threshold
+        sampleSize = Math.max(sampleSize, minSampleSize);
+        sampleSize = Math.min(sampleSize, maxSampleSize);
+        return sampleSize;
+    }
+
     private ArrayList<Document> sampleData(MongoCollection<Document> 
collection, Long sampleNum) {
         ArrayList<Document> query = new ArrayList<>();
         query.add(new Document("$sample", new Document("size", sampleNum)));
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java
new file mode 100644
index 00000000..94c6c51c
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoDBDatabaseSyncTest {
+    private MongoDBDatabaseSync mongoDBDatabaseSync;
+
+    @Before
+    public void init() throws SQLException {
+        mongoDBDatabaseSync = new MongoDBDatabaseSync();
+    }
+
+    @Test
+    public void testCalculateSampleSize() {
+        long sampleSize1 = mongoDBDatabaseSync.calculateSampleSize(100L, 0.2, 
1000L, 100000L);
+        long sampleSize2 = mongoDBDatabaseSync.calculateSampleSize(1000L, 0.2, 
1000L, 100000L);
+        long sampleSize3 = mongoDBDatabaseSync.calculateSampleSize(2000L, 0.2, 
1000L, 100000L);
+        long sampleSize4 = mongoDBDatabaseSync.calculateSampleSize(10000L, 
0.2, 1000L, 100000L);
+        long sampleSize5 = mongoDBDatabaseSync.calculateSampleSize(1000000L, 
0.2, 1000L, 100000L);
+        assertEquals(100, sampleSize1);
+        assertEquals(1000, sampleSize2);
+        assertEquals(1000, sampleSize3);
+        assertEquals(2000, sampleSize4);
+        assertEquals(100000, sampleSize5);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to