This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eb2f136 [improve]cdc tools parameter removes leading and trailing 
spaces (#447)
8eb2f136 is described below

commit 8eb2f136263b9975af857077665be3eed5fc28e9
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Fri Jul 26 16:10:56 2024 +0800

    [improve]cdc tools parameter removes leading and trailing spaces (#447)
---
 .../org/apache/doris/flink/tools/cdc/CdcTools.java | 35 +++++-----
 .../apache/doris/flink/tools/cdc/CdcToolsTest.java | 74 ++++++++++++++++++++--
 2 files changed, 85 insertions(+), 24 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index b62f0f52..c91a0734 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -46,24 +46,25 @@ public class CdcTools {
         System.out.println("Input args: " + Arrays.asList(args) + ".\n");
         String operation = args[0].toLowerCase();
         String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
         switch (operation) {
             case DatabaseSyncConfig.MYSQL_SYNC_DATABASE:
-                createMySQLSyncDatabase(opArgs);
+                createMySQLSyncDatabase(params);
                 break;
             case DatabaseSyncConfig.ORACLE_SYNC_DATABASE:
-                createOracleSyncDatabase(opArgs);
+                createOracleSyncDatabase(params);
                 break;
             case DatabaseSyncConfig.POSTGRES_SYNC_DATABASE:
-                createPostgresSyncDatabase(opArgs);
+                createPostgresSyncDatabase(params);
                 break;
             case DatabaseSyncConfig.SQLSERVER_SYNC_DATABASE:
-                createSqlServerSyncDatabase(opArgs);
+                createSqlServerSyncDatabase(params);
                 break;
             case DatabaseSyncConfig.MONGODB_SYNC_DATABASE:
-                createMongoDBSyncDatabase(opArgs);
+                createMongoDBSyncDatabase(params);
                 break;
             case DatabaseSyncConfig.DB2_SYNC_DATABASE:
-                createDb2SyncDatabase(opArgs);
+                createDb2SyncDatabase(params);
                 break;
             default:
                 System.out.println("Unknown operation " + operation);
@@ -71,8 +72,7 @@ public class CdcTools {
         }
     }
 
-    private static void createMySQLSyncDatabase(String[] opArgs) throws 
Exception {
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+    private static void createMySQLSyncDatabase(MultipleParameterTool params) 
throws Exception {
         Preconditions.checkArgument(params.has(DatabaseSyncConfig.MYSQL_CONF));
         Map<String, String> mysqlMap = getConfigMap(params, 
DatabaseSyncConfig.MYSQL_CONF);
         Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
@@ -80,8 +80,7 @@ public class CdcTools {
         syncDatabase(params, databaseSync, mysqlConfig, SourceConnector.MYSQL);
     }
 
-    private static void createOracleSyncDatabase(String[] opArgs) throws 
Exception {
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+    private static void createOracleSyncDatabase(MultipleParameterTool params) 
throws Exception {
         
Preconditions.checkArgument(params.has(DatabaseSyncConfig.ORACLE_CONF));
         Map<String, String> oracleMap = getConfigMap(params, 
DatabaseSyncConfig.ORACLE_CONF);
         Configuration oracleConfig = Configuration.fromMap(oracleMap);
@@ -89,8 +88,7 @@ public class CdcTools {
         syncDatabase(params, databaseSync, oracleConfig, 
SourceConnector.ORACLE);
     }
 
-    private static void createPostgresSyncDatabase(String[] opArgs) throws 
Exception {
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+    private static void createPostgresSyncDatabase(MultipleParameterTool 
params) throws Exception {
         
Preconditions.checkArgument(params.has(DatabaseSyncConfig.POSTGRES_CONF));
         Map<String, String> postgresMap = getConfigMap(params, 
DatabaseSyncConfig.POSTGRES_CONF);
         Configuration postgresConfig = Configuration.fromMap(postgresMap);
@@ -98,8 +96,7 @@ public class CdcTools {
         syncDatabase(params, databaseSync, postgresConfig, 
SourceConnector.POSTGRES);
     }
 
-    private static void createSqlServerSyncDatabase(String[] opArgs) throws 
Exception {
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+    private static void createSqlServerSyncDatabase(MultipleParameterTool 
params) throws Exception {
         
Preconditions.checkArgument(params.has(DatabaseSyncConfig.SQLSERVER_CONF));
         Map<String, String> postgresMap = getConfigMap(params, 
DatabaseSyncConfig.SQLSERVER_CONF);
         Configuration postgresConfig = Configuration.fromMap(postgresMap);
@@ -107,8 +104,7 @@ public class CdcTools {
         syncDatabase(params, databaseSync, postgresConfig, 
SourceConnector.SQLSERVER);
     }
 
-    private static void createMongoDBSyncDatabase(String[] opArgs) throws 
Exception {
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+    private static void createMongoDBSyncDatabase(MultipleParameterTool 
params) throws Exception {
         
Preconditions.checkArgument(params.has(DatabaseSyncConfig.MONGODB_CONF));
         Map<String, String> mongoMap = getConfigMap(params, 
DatabaseSyncConfig.MONGODB_CONF);
         Configuration mongoConfig = Configuration.fromMap(mongoMap);
@@ -116,8 +112,7 @@ public class CdcTools {
         syncDatabase(params, databaseSync, mongoConfig, 
SourceConnector.MONGODB);
     }
 
-    private static void createDb2SyncDatabase(String[] opArgs) throws 
Exception {
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+    private static void createDb2SyncDatabase(MultipleParameterTool params) 
throws Exception {
         Preconditions.checkArgument(params.has(DatabaseSyncConfig.DB2_CONF));
         Map<String, String> db2Map = getConfigMap(params, 
DatabaseSyncConfig.DB2_CONF);
         Configuration db2Config = Configuration.fromMap(db2Map);
@@ -197,10 +192,10 @@ public class CdcTools {
         for (String param : params.getMultiParameter(key)) {
             String[] kv = param.split("=", 2);
             if (kv.length == 2) {
-                map.put(kv[0], kv[1]);
+                map.put(kv[0].trim(), kv[1].trim());
                 continue;
             } else if (kv.length == 1 && EMPTY_KEYS.contains(kv[0])) {
-                map.put(kv[0], "");
+                map.put(kv[0].trim(), "");
                 continue;
             }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
index 5944ba52..41bcb621 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
@@ -22,8 +22,14 @@ import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 public class CdcToolsTest {
 
@@ -32,21 +38,81 @@ public class CdcToolsTest {
         MultipleParameterTool params =
                 MultipleParameterTool.fromArgs(
                         new String[] {
-                            "--sink-conf", "fenodes=127.0.0.1:8030", 
"--sink-conf", "password="
+                            "--sink-conf",
+                            "fenodes = 127.0.0.1:8030",
+                            "--sink-conf",
+                            "password=",
+                            "--sink-conf",
+                            "jdbc-url= jdbc:mysql://127.0.0.1:9030 ",
+                            "--sink-conf",
+                            "sink.label-prefix  = label "
                         });
-        Map<String, String> sinkConf = CdcTools.getConfigMap(params, 
"sink-conf");
+        Map<String, String> sinkConf = CdcTools.getConfigMap(params, 
DatabaseSyncConfig.SINK_CONF);
 
         Map<String, String> excepted = new HashMap<>();
         excepted.put("password", "");
         excepted.put("fenodes", "127.0.0.1:8030");
+        excepted.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
+        excepted.put("sink.label-prefix", "label");
         Assert.assertEquals(sinkConf, excepted);
 
-        Map<String, String> mysqlConf = CdcTools.getConfigMap(params, 
"--mysql-conf");
+        Map<String, String> mysqlConf =
+                CdcTools.getConfigMap(params, DatabaseSyncConfig.MYSQL_CONF);
         Assert.assertNull(mysqlConf);
 
         MultipleParameterTool params2 =
                 MultipleParameterTool.fromArgs(new String[] {"--sink-conf", 
"fenodes"});
-        Map<String, String> sinkConf2 = CdcTools.getConfigMap(params2, 
"sink-conf");
+        Map<String, String> sinkConf2 =
+                CdcTools.getConfigMap(params2, DatabaseSyncConfig.SINGLE_SINK);
         Assert.assertNull(sinkConf2);
     }
+
+    @Test
+    public void testGetConfigMap() {
+        Map<String, Collection<String>> config = new HashMap<>();
+        config.put(
+                DatabaseSyncConfig.MYSQL_CONF, Arrays.asList("  
hostname=127.0.0.1", " port=3306"));
+        config.put(
+                DatabaseSyncConfig.POSTGRES_CONF,
+                Arrays.asList("hostname=127.0.0.1 ", "port=5432 "));
+        config.put(
+                DatabaseSyncConfig.SINK_CONF,
+                Arrays.asList(" fenodes=127.0.0.1:8030 ", " username=root"));
+        config.put(DatabaseSyncConfig.TABLE_CONF, Collections.singletonList("  
replication_num=1"));
+        MultipleParameterTool parameter = 
MultipleParameterTool.fromMultiMap(config);
+        Map<String, String> mysqlConfigMap =
+                CdcTools.getConfigMap(parameter, 
DatabaseSyncConfig.MYSQL_CONF);
+        Map<String, String> postGresConfigMap =
+                CdcTools.getConfigMap(parameter, 
DatabaseSyncConfig.POSTGRES_CONF);
+        Map<String, String> sinkConfigMap =
+                CdcTools.getConfigMap(parameter, DatabaseSyncConfig.SINK_CONF);
+        Map<String, String> tableConfigMap =
+                CdcTools.getConfigMap(parameter, 
DatabaseSyncConfig.TABLE_CONF);
+
+        Set<String> mysqlKeyConf = new HashSet<>(Arrays.asList("hostname", 
"port"));
+        Set<String> mysqlValueConf = new HashSet<>(Arrays.asList("127.0.0.1", 
"3306"));
+        assertEquals(mysqlConfigMap, mysqlKeyConf, mysqlValueConf);
+
+        Set<String> postgresKeyConf = new HashSet<>(Arrays.asList("hostname", 
"port"));
+        Set<String> postgresValueConf = new 
HashSet<>(Arrays.asList("127.0.0.1", "5432"));
+        assertEquals(postGresConfigMap, postgresKeyConf, postgresValueConf);
+
+        Set<String> sinkKeyConf = new HashSet<>(Arrays.asList("fenodes", 
"username"));
+        Set<String> sinkValueConf = new 
HashSet<>(Arrays.asList("127.0.0.1:8030", "root"));
+        assertEquals(sinkConfigMap, sinkKeyConf, sinkValueConf);
+
+        Set<String> tableKeyConf = new 
HashSet<>(Collections.singletonList("replication_num"));
+        Set<String> tableValueConf = new 
HashSet<>(Collections.singletonList("1"));
+        assertEquals(tableConfigMap, tableKeyConf, tableValueConf);
+    }
+
+    private void assertEquals(
+            Map<String, String> actualMap, Set<String> keyConf, Set<String> 
valueConf) {
+        for (Entry<String, String> entry : actualMap.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            Assert.assertTrue(keyConf.contains(key));
+            Assert.assertTrue(valueConf.contains(value));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to