This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8d30367 add jdbcProperties options (#146) 8d30367 is described below commit 8d30367fa9a9bfdf26ec4a2374405ab8e169c9f2 Author: wudi <676366...@qq.com> AuthorDate: Fri Jun 9 13:53:28 2023 +0800 add jdbcProperties options (#146) * add use SSL Co-authored-by: wudi <> --- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 27 ++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index caa975d..629e6e1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -50,18 +50,19 @@ import java.util.Properties; public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); + private static String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; + public MysqlDatabaseSync() { } @Override public Connection getConnection() throws SQLException { - return DriverManager.getConnection( - String.format( - "jdbc:mysql://%s:%d?useInformationSchema=true", - config.get(MySqlSourceOptions.HOSTNAME), - config.get(MySqlSourceOptions.PORT)), - config.get(MySqlSourceOptions.USERNAME), - config.get(MySqlSourceOptions.PASSWORD)); + Properties jdbcProperties = getJdbcProperties(); + StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL); + jdbcProperties.forEach((key, value) -> jdbcUrlSb.append("&").append(key).append("=").append(value)); + String jdbcUrl = String.format(jdbcUrlSb.toString(), config.get(MySqlSourceOptions.HOSTNAME), config.get(MySqlSourceOptions.PORT)); + + return DriverManager.getConnection(jdbcUrl,config.get(MySqlSourceOptions.USERNAME),config.get(MySqlSourceOptions.PASSWORD)); } @Override @@ -189,4 +190,16 @@ public class MysqlDatabaseSync extends DatabaseSync { mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); return streamSource; } + + private Properties getJdbcProperties(){ + Properties jdbcProps = new Properties(); + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) { + jdbcProps.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value); + } + } + return jdbcProps; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org