This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new fb254846 [fix](cdc)fix excluding pattern not working (#390)
fb254846 is described below

commit fb25484677481af5a0d18b48f900122f883f614e
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Fri Jun 7 11:15:00 2024 +0800

    [fix](cdc)fix excluding pattern not working (#390)
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  2 +-
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   |  3 +--
 .../doris/flink/tools/cdc/DatabaseSyncTest.java    | 28 +++++++++++++++++++---
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index a1f511a8..691eaafa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -376,7 +376,7 @@ public abstract class DatabaseSync {
             } else {
                 String excludingPattern =
                         String.format("?!(%s\\.(%s))$", getTableListPrefix(), 
excludingTables);
-                return String.format("(%s)(%s)", includingPattern, 
excludingPattern);
+                return String.format("(%s)(%s)", excludingPattern, 
includingPattern);
             }
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 03d3d076..63522472 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -230,8 +230,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
 
     @Override
     public String getTableListPrefix() {
-        String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
-        return databaseName;
+        return config.get(MySqlSourceOptions.DATABASE_NAME);
     }
 
     /**
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index 1e69c598..f0cd0a51 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /** Unit tests for the {@link DatabaseSync}. */
 public class DatabaseSyncTest {
@@ -69,7 +71,7 @@ public class DatabaseSyncTest {
     public void getTableBucketsTest() throws SQLException {
         String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        Map<String, Integer> tableBucketsMap = 
databaseSync.getTableBuckets(tableBuckets);
+        Map<String, Integer> tableBucketsMap = 
DatabaseSync.getTableBuckets(tableBuckets);
         assertEquals(10, tableBucketsMap.get("tbl1").intValue());
         assertEquals(20, tableBucketsMap.get("tbl2").intValue());
         assertEquals(30, tableBucketsMap.get("a.*").intValue());
@@ -81,7 +83,7 @@ public class DatabaseSyncTest {
     public void setTableSchemaBucketsTest() throws SQLException {
         DatabaseSync databaseSync = new MysqlDatabaseSync();
         String tableSchemaBuckets = 
"tbl1:10,tbl2:20,a11.*:30,a1.*:40,b.*:50,b1.*:60,.*:70";
-        Map<String, Integer> tableBucketsMap = 
databaseSync.getTableBuckets(tableSchemaBuckets);
+        Map<String, Integer> tableBucketsMap = 
DatabaseSync.getTableBuckets(tableSchemaBuckets);
         List<String> tableList =
                 Arrays.asList(
                         "tbl1", "tbl2", "tbl3", "a11", "a111", "a12", "a13", 
"b1", "b11", "b2",
@@ -103,7 +105,7 @@ public class DatabaseSyncTest {
     public void setTableSchemaBucketsTest1() throws SQLException {
         DatabaseSync databaseSync = new MysqlDatabaseSync();
         String tableSchemaBuckets = ".*:10,a.*:20,tbl:30,b.*:40";
-        Map<String, Integer> tableBucketsMap = 
databaseSync.getTableBuckets(tableSchemaBuckets);
+        Map<String, Integer> tableBucketsMap = 
DatabaseSync.getTableBuckets(tableSchemaBuckets);
         List<String> tableList = Arrays.asList("a1", "a2", "a3", "b1", "a");
         HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets1();
         Set<String> tableSet = new HashSet<>();
@@ -147,4 +149,24 @@ public class DatabaseSyncTest {
         matchedTableBucketsMap.put("tbl1", 10);
         return matchedTableBucketsMap;
     }
+
+    @Test
+    public void singleSinkTablePatternTest() throws SQLException {
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync.setSingleSink(true);
+        databaseSync.setIncludingTables(".*");
+        databaseSync.setExcludingTables("customer|dates|lineorder");
+        Configuration config = new Configuration();
+        config.setString("database-name", "ssb_test");
+        databaseSync.setConfig(config);
+        List<String> tableList =
+                Arrays.asList("customer", "dates", "lineorder", "test1", 
"test2", "test3");
+        String syncTableListPattern = databaseSync.getSyncTableList(tableList);
+        assertTrue("ssb_test.test1".matches(syncTableListPattern));
+        assertTrue("ssb_test.test2".matches(syncTableListPattern));
+        assertTrue("ssb_test.test3".matches(syncTableListPattern));
+        assertFalse("ssb_test.customer".matches(syncTableListPattern));
+        assertFalse("ssb_test.dates".matches(syncTableListPattern));
+        assertFalse("ssb_test.lineorder".matches(syncTableListPattern));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to