This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8eb2f136 [improve]cdc tools parameter removes leading and trailing spaces (#447) 8eb2f136 is described below commit 8eb2f136263b9975af857077665be3eed5fc28e9 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Jul 26 16:10:56 2024 +0800 [improve]cdc tools parameter removes leading and trailing spaces (#447) --- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 35 +++++----- .../apache/doris/flink/tools/cdc/CdcToolsTest.java | 74 ++++++++++++++++++++-- 2 files changed, 85 insertions(+), 24 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index b62f0f52..c91a0734 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -46,24 +46,25 @@ public class CdcTools { System.out.println("Input args: " + Arrays.asList(args) + ".\n"); String operation = args[0].toLowerCase(); String[] opArgs = Arrays.copyOfRange(args, 1, args.length); + MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); switch (operation) { case DatabaseSyncConfig.MYSQL_SYNC_DATABASE: - createMySQLSyncDatabase(opArgs); + createMySQLSyncDatabase(params); break; case DatabaseSyncConfig.ORACLE_SYNC_DATABASE: - createOracleSyncDatabase(opArgs); + createOracleSyncDatabase(params); break; case DatabaseSyncConfig.POSTGRES_SYNC_DATABASE: - createPostgresSyncDatabase(opArgs); + createPostgresSyncDatabase(params); break; case DatabaseSyncConfig.SQLSERVER_SYNC_DATABASE: - createSqlServerSyncDatabase(opArgs); + createSqlServerSyncDatabase(params); break; case DatabaseSyncConfig.MONGODB_SYNC_DATABASE: - createMongoDBSyncDatabase(opArgs); + createMongoDBSyncDatabase(params); break; case DatabaseSyncConfig.DB2_SYNC_DATABASE: - createDb2SyncDatabase(opArgs); + createDb2SyncDatabase(params); break; default: System.out.println("Unknown operation " + operation); @@ -71,8 +72,7 @@ public class CdcTools { } } - private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + private static void createMySQLSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.MYSQL_CONF)); Map<String, String> mysqlMap = getConfigMap(params, DatabaseSyncConfig.MYSQL_CONF); Configuration mysqlConfig = Configuration.fromMap(mysqlMap); @@ -80,8 +80,7 @@ public class CdcTools { syncDatabase(params, databaseSync, mysqlConfig, SourceConnector.MYSQL); } - private static void createOracleSyncDatabase(String[] opArgs) throws Exception { - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + private static void createOracleSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.ORACLE_CONF)); Map<String, String> oracleMap = getConfigMap(params, DatabaseSyncConfig.ORACLE_CONF); Configuration oracleConfig = Configuration.fromMap(oracleMap); @@ -89,8 +88,7 @@ public class CdcTools { syncDatabase(params, databaseSync, oracleConfig, SourceConnector.ORACLE); } - private static void createPostgresSyncDatabase(String[] opArgs) throws Exception { - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + private static void createPostgresSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.POSTGRES_CONF)); Map<String, String> postgresMap = getConfigMap(params, DatabaseSyncConfig.POSTGRES_CONF); Configuration postgresConfig = Configuration.fromMap(postgresMap); @@ -98,8 +96,7 @@ public class CdcTools { syncDatabase(params, databaseSync, postgresConfig, SourceConnector.POSTGRES); } - private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception { - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + private static void createSqlServerSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.SQLSERVER_CONF)); Map<String, String> postgresMap = getConfigMap(params, DatabaseSyncConfig.SQLSERVER_CONF); Configuration postgresConfig = Configuration.fromMap(postgresMap); @@ -107,8 +104,7 @@ public class CdcTools { syncDatabase(params, databaseSync, postgresConfig, SourceConnector.SQLSERVER); } - private static void createMongoDBSyncDatabase(String[] opArgs) throws Exception { - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + private static void createMongoDBSyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.MONGODB_CONF)); Map<String, String> mongoMap = getConfigMap(params, DatabaseSyncConfig.MONGODB_CONF); Configuration mongoConfig = Configuration.fromMap(mongoMap); @@ -116,8 +112,7 @@ public class CdcTools { syncDatabase(params, databaseSync, mongoConfig, SourceConnector.MONGODB); } - private static void createDb2SyncDatabase(String[] opArgs) throws Exception { - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + private static void createDb2SyncDatabase(MultipleParameterTool params) throws Exception { Preconditions.checkArgument(params.has(DatabaseSyncConfig.DB2_CONF)); Map<String, String> db2Map = getConfigMap(params, DatabaseSyncConfig.DB2_CONF); Configuration db2Config = Configuration.fromMap(db2Map); @@ -197,10 +192,10 @@ public class CdcTools { for (String param : params.getMultiParameter(key)) { String[] kv = param.split("=", 2); if (kv.length == 2) { - map.put(kv[0], kv[1]); + map.put(kv[0].trim(), kv[1].trim()); continue; } else if (kv.length == 1 && EMPTY_KEYS.contains(kv[0])) { - map.put(kv[0], ""); + map.put(kv[0].trim(), ""); continue; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java index 5944ba52..41bcb621 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java @@ -22,8 +22,14 @@ import org.apache.flink.api.java.utils.MultipleParameterTool; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; public class CdcToolsTest { @@ -32,21 +38,81 @@ public class CdcToolsTest { MultipleParameterTool params = MultipleParameterTool.fromArgs( new String[] { - "--sink-conf", "fenodes=127.0.0.1:8030", "--sink-conf", "password=" + "--sink-conf", + "fenodes = 127.0.0.1:8030", + "--sink-conf", + "password=", + "--sink-conf", + "jdbc-url= jdbc:mysql://127.0.0.1:9030 ", + "--sink-conf", + "sink.label-prefix = label " }); - Map<String, String> sinkConf = CdcTools.getConfigMap(params, "sink-conf"); + Map<String, String> sinkConf = CdcTools.getConfigMap(params, DatabaseSyncConfig.SINK_CONF); Map<String, String> excepted = new HashMap<>(); excepted.put("password", ""); excepted.put("fenodes", "127.0.0.1:8030"); + excepted.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); + excepted.put("sink.label-prefix", "label"); Assert.assertEquals(sinkConf, excepted); - Map<String, String> mysqlConf = CdcTools.getConfigMap(params, "--mysql-conf"); + Map<String, String> mysqlConf = + CdcTools.getConfigMap(params, DatabaseSyncConfig.MYSQL_CONF); Assert.assertNull(mysqlConf); MultipleParameterTool params2 = MultipleParameterTool.fromArgs(new String[] {"--sink-conf", "fenodes"}); - Map<String, String> sinkConf2 = CdcTools.getConfigMap(params2, "sink-conf"); + Map<String, String> sinkConf2 = + CdcTools.getConfigMap(params2, DatabaseSyncConfig.SINGLE_SINK); Assert.assertNull(sinkConf2); } + + @Test + public void testGetConfigMap() { + Map<String, Collection<String>> config = new HashMap<>(); + config.put( + DatabaseSyncConfig.MYSQL_CONF, Arrays.asList(" hostname=127.0.0.1", " port=3306")); + config.put( + DatabaseSyncConfig.POSTGRES_CONF, + Arrays.asList("hostname=127.0.0.1 ", "port=5432 ")); + config.put( + DatabaseSyncConfig.SINK_CONF, + Arrays.asList(" fenodes=127.0.0.1:8030 ", " username=root")); + config.put(DatabaseSyncConfig.TABLE_CONF, Collections.singletonList(" replication_num=1")); + MultipleParameterTool parameter = MultipleParameterTool.fromMultiMap(config); + Map<String, String> mysqlConfigMap = + CdcTools.getConfigMap(parameter, DatabaseSyncConfig.MYSQL_CONF); + Map<String, String> postGresConfigMap = + CdcTools.getConfigMap(parameter, DatabaseSyncConfig.POSTGRES_CONF); + Map<String, String> sinkConfigMap = + CdcTools.getConfigMap(parameter, DatabaseSyncConfig.SINK_CONF); + Map<String, String> tableConfigMap = + CdcTools.getConfigMap(parameter, DatabaseSyncConfig.TABLE_CONF); + + Set<String> mysqlKeyConf = new HashSet<>(Arrays.asList("hostname", "port")); + Set<String> mysqlValueConf = new HashSet<>(Arrays.asList("127.0.0.1", "3306")); + assertEquals(mysqlConfigMap, mysqlKeyConf, mysqlValueConf); + + Set<String> postgresKeyConf = new HashSet<>(Arrays.asList("hostname", "port")); + Set<String> postgresValueConf = new HashSet<>(Arrays.asList("127.0.0.1", "5432")); + assertEquals(postGresConfigMap, postgresKeyConf, postgresValueConf); + + Set<String> sinkKeyConf = new HashSet<>(Arrays.asList("fenodes", "username")); + Set<String> sinkValueConf = new HashSet<>(Arrays.asList("127.0.0.1:8030", "root")); + assertEquals(sinkConfigMap, sinkKeyConf, sinkValueConf); + + Set<String> tableKeyConf = new HashSet<>(Collections.singletonList("replication_num")); + Set<String> tableValueConf = new HashSet<>(Collections.singletonList("1")); + assertEquals(tableConfigMap, tableKeyConf, tableValueConf); + } + + private void assertEquals( + Map<String, String> actualMap, Set<String> keyConf, Set<String> valueConf) { + for (Entry<String, String> entry : actualMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + Assert.assertTrue(keyConf.contains(key)); + Assert.assertTrue(valueConf.contains(value)); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org