This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 766b6905 [Chore] Split some case logic (#487)
766b6905 is described below

commit 766b69056a8796448352b3364b901086d402d74c
Author: wudi <676366...@qq.com>
AuthorDate: Wed Sep 11 15:27:28 2024 +0800

    [Chore] Split some case logic (#487)
---
 .../doris/flink/tools/cdc/DatabaseSyncConfig.java  |  2 --
 .../doris/flink/tools/cdc/DorisTableConfig.java    | 12 ++++++-----
 .../flink/container/e2e/Doris2DorisE2ECase.java    | 23 ++--------------------
 .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java    |  4 ++--
 .../flink/tools/cdc/CdcMongoSyncDatabaseCase.java  |  4 ++--
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |  4 ++--
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |  4 ++--
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  4 ++--
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |  4 ++--
 9 files changed, 21 insertions(+), 40 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
index 6c78d5cd..e1a089ff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
@@ -69,8 +69,6 @@ public class DatabaseSyncConfig {
     public static final String SINGLE_SINK = "single-sink";
     ////////// doris-table-conf //////////
     public static final String TABLE_CONF = "table-conf";
-    public static final String REPLICATION_NUM = "replication_num";
-    public static final String TABLE_BUCKETS = "table-buckets";
 
     ////////// date-converter-conf //////////
     public static final String CONVERTERS = "converters";
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
index 912ed698..6318fc8a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -26,8 +26,11 @@ import java.util.Map;
 import java.util.Objects;
 
 public class DorisTableConfig implements Serializable {
-    private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+    public static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
     // PROPERTIES parameter in doris table creation statement. such as: 
replication_num=1.
+    public static final String REPLICATION_NUM = "replication_num";
+    public static final String TABLE_BUCKETS = "table-buckets";
+
     private final Map<String, String> tableProperties;
     // The specific parameters extracted from --table-conf need to be parsed 
and integrated into the
     // doris table creation statement. such as: 
table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
@@ -48,10 +51,9 @@ public class DorisTableConfig implements Serializable {
         if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
             tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true));
         }
-        if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) {
-            this.tableBuckets =
-                    
buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS));
-            tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS);
+        if (tableConfig.containsKey(TABLE_BUCKETS)) {
+            this.tableBuckets = 
buildTableBucketMap(tableConfig.get(TABLE_BUCKETS));
+            tableConfig.remove(TABLE_BUCKETS);
         }
         tableProperties = tableConfig;
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
index 4b4e3b26..f7b3bee7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -24,11 +24,9 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
-import org.apache.doris.flink.container.AbstractE2EService;
+import org.apache.doris.flink.container.AbstractContainerTestBase;
 import org.apache.doris.flink.container.ContainerUtils;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,19 +35,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-public class Doris2DorisE2ECase extends AbstractE2EService {
+public class Doris2DorisE2ECase extends AbstractContainerTestBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(Doris2DorisE2ECase.class);
     private static final String DATABASE_SOURCE = "test_doris2doris_source";
     private static final String DATABASE_SINK = "test_doris2doris_sink";
     private static final String TABLE = "test_tbl";
 
-    @Before
-    public void setUp() throws InterruptedException {
-        LOG.info("Doris2DorisE2ECase attempting to acquire semaphore.");
-        SEMAPHORE.acquire();
-        LOG.info("Doris2DorisE2ECase semaphore acquired.");
-    }
-
     @Test
     public void testDoris2Doris() throws Exception {
         LOG.info("Start executing the test case of doris to doris.");
@@ -163,14 +154,4 @@ public class Doris2DorisE2ECase extends AbstractE2EService 
{
         ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, 
sinkInitSql);
         LOG.info("Initialization of doris table successful.");
     }
-
-    @After
-    public void close() {
-        try {
-            // Ensure that semaphore is always released
-        } finally {
-            LOG.info("Doris2DorisE2ECase releasing semaphore.");
-            SEMAPHORE.release();
-        }
-    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
index 0666cb9d..05536557 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -65,8 +65,8 @@ public class CdcDb2SyncDatabaseCase {
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
-        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "FULL_TYPES";
         String excludingTables = null;
         String multiToOneOrigin = null;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
index 09006b2c..ffc8a75d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
@@ -68,8 +68,8 @@ public class CdcMongoSyncDatabaseCase {
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
-        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1");
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1");
         String includingTables = "cdc_test";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index c430ea87..e85e888f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -67,8 +67,8 @@ public class CdcMysqlSyncDatabaseCase {
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
-        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         // String includingTables = "tbl1|tbl2|tbl3";
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index a47212c8..92600ffd 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -62,8 +62,8 @@ public class CdcOraclelSyncDatabaseCase {
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
-        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 99892e02..33184011 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -66,8 +66,8 @@ public class CdcPostgresSyncDatabaseCase {
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
-        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index ca6a3121..7a1cf276 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -65,8 +65,8 @@ public class CdcSqlServerSyncDatabaseCase {
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
-        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to