This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d30367  add jdbcProperties options (#146)
8d30367 is described below

commit 8d30367fa9a9bfdf26ec4a2374405ab8e169c9f2
Author: wudi <676366...@qq.com>
AuthorDate: Fri Jun 9 13:53:28 2023 +0800

    add jdbcProperties options (#146)
    
    * add use SSL
    
    Co-authored-by: wudi <>
---
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   | 27 ++++++++++++++++------
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index caa975d..629e6e1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -50,18 +50,19 @@ import java.util.Properties;
 public class MysqlDatabaseSync extends DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(MysqlDatabaseSync.class);
 
+    private static String JDBC_URL = 
"jdbc:mysql://%s:%d?useInformationSchema=true";
+
     public MysqlDatabaseSync() {
     }
 
     @Override
     public Connection getConnection() throws SQLException {
-        return DriverManager.getConnection(
-                String.format(
-                        "jdbc:mysql://%s:%d?useInformationSchema=true",
-                        config.get(MySqlSourceOptions.HOSTNAME),
-                        config.get(MySqlSourceOptions.PORT)),
-                config.get(MySqlSourceOptions.USERNAME),
-                config.get(MySqlSourceOptions.PASSWORD));
+        Properties jdbcProperties = getJdbcProperties();
+        StringBuilder jdbcUrlSb = new StringBuilder(JDBC_URL);
+        jdbcProperties.forEach((key, value) -> 
jdbcUrlSb.append("&").append(key).append("=").append(value));
+        String jdbcUrl = String.format(jdbcUrlSb.toString(), 
config.get(MySqlSourceOptions.HOSTNAME), config.get(MySqlSourceOptions.PORT));
+
+        return 
DriverManager.getConnection(jdbcUrl,config.get(MySqlSourceOptions.USERNAME),config.get(MySqlSourceOptions.PASSWORD));
     }
 
     @Override
@@ -189,4 +190,16 @@ public class MysqlDatabaseSync extends DatabaseSync {
                 mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
         return streamSource;
     }
+
+    private Properties getJdbcProperties(){
+        Properties jdbcProps = new Properties();
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+                
jdbcProps.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+            }
+        }
+        return jdbcProps;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to