This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new fb254846 [fix](cdc)fix excluding pattern not working (#390) fb254846 is described below commit fb25484677481af5a0d18b48f900122f883f614e Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Fri Jun 7 11:15:00 2024 +0800 [fix](cdc)fix excluding pattern not working (#390) --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 3 +-- .../doris/flink/tools/cdc/DatabaseSyncTest.java | 28 +++++++++++++++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index a1f511a8..691eaafa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -376,7 +376,7 @@ public abstract class DatabaseSync { } else { String excludingPattern = String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables); - return String.format("(%s)(%s)", includingPattern, excludingPattern); + return String.format("(%s)(%s)", excludingPattern, includingPattern); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 03d3d076..63522472 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -230,8 +230,7 @@ public class MysqlDatabaseSync extends DatabaseSync { @Override public String getTableListPrefix() { - String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME); - return databaseName; + return config.get(MySqlSourceOptions.DATABASE_NAME); } /** diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java index 1e69c598..f0cd0a51 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -33,6 +33,8 @@ import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** Unit tests for the {@link DatabaseSync}. */ public class DatabaseSyncTest { @@ -69,7 +71,7 @@ public class DatabaseSyncTest { public void getTableBucketsTest() throws SQLException { String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50"; DatabaseSync databaseSync = new MysqlDatabaseSync(); - Map<String, Integer> tableBucketsMap = databaseSync.getTableBuckets(tableBuckets); + Map<String, Integer> tableBucketsMap = DatabaseSync.getTableBuckets(tableBuckets); assertEquals(10, tableBucketsMap.get("tbl1").intValue()); assertEquals(20, tableBucketsMap.get("tbl2").intValue()); assertEquals(30, tableBucketsMap.get("a.*").intValue()); @@ -81,7 +83,7 @@ public class DatabaseSyncTest { public void setTableSchemaBucketsTest() throws SQLException { DatabaseSync databaseSync = new MysqlDatabaseSync(); String tableSchemaBuckets = "tbl1:10,tbl2:20,a11.*:30,a1.*:40,b.*:50,b1.*:60,.*:70"; - Map<String, Integer> tableBucketsMap = databaseSync.getTableBuckets(tableSchemaBuckets); + Map<String, Integer> tableBucketsMap = DatabaseSync.getTableBuckets(tableSchemaBuckets); List<String> tableList = Arrays.asList( "tbl1", "tbl2", "tbl3", "a11", "a111", "a12", "a13", "b1", "b11", "b2", @@ -103,7 +105,7 @@ public class DatabaseSyncTest { public void setTableSchemaBucketsTest1() throws SQLException { DatabaseSync databaseSync = new MysqlDatabaseSync(); String tableSchemaBuckets = ".*:10,a.*:20,tbl:30,b.*:40"; - Map<String, Integer> tableBucketsMap = databaseSync.getTableBuckets(tableSchemaBuckets); + Map<String, Integer> tableBucketsMap = DatabaseSync.getTableBuckets(tableSchemaBuckets); List<String> tableList = Arrays.asList("a1", "a2", "a3", "b1", "a"); HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets1(); Set<String> tableSet = new HashSet<>(); @@ -147,4 +149,24 @@ public class DatabaseSyncTest { matchedTableBucketsMap.put("tbl1", 10); return matchedTableBucketsMap; } + + @Test + public void singleSinkTablePatternTest() throws SQLException { + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.setSingleSink(true); + databaseSync.setIncludingTables(".*"); + databaseSync.setExcludingTables("customer|dates|lineorder"); + Configuration config = new Configuration(); + config.setString("database-name", "ssb_test"); + databaseSync.setConfig(config); + List<String> tableList = + Arrays.asList("customer", "dates", "lineorder", "test1", "test2", "test3"); + String syncTableListPattern = databaseSync.getSyncTableList(tableList); + assertTrue("ssb_test.test1".matches(syncTableListPattern)); + assertTrue("ssb_test.test2".matches(syncTableListPattern)); + assertTrue("ssb_test.test3".matches(syncTableListPattern)); + assertFalse("ssb_test.customer".matches(syncTableListPattern)); + assertFalse("ssb_test.dates".matches(syncTableListPattern)); + assertFalse("ssb_test.lineorder".matches(syncTableListPattern)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org