This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 800755e [log](improve)Add printing parameters and move the verification forward (#295) 800755e is described below commit 800755e24ddc6d589b11eba622702fab2b858b17 Author: wudi <676366...@qq.com> AuthorDate: Wed Jan 17 10:09:43 2024 +0800 [log](improve)Add printing parameters and move the verification forward (#295) --- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 19 ++++++++++-- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +++- .../java/org/apache/doris/flink/DorisTestBase.java | 34 ++++++++++++++++++++-- .../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 2 ++ 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 9caddd2..c76506a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; @@ -42,9 +43,9 @@ public class CdcTools { private static final List<String> EMPTY_KEYS = Collections.singletonList("password"); public static void main(String[] args) throws Exception { + System.out.println("Input args: " + Arrays.asList(args) + ".\n"); String operation = args[0].toLowerCase(); String[] opArgs = Arrays.copyOfRange(args, 1, args.length); - System.out.println(); switch (operation) { case MYSQL_SYNC_DATABASE: createMySQLSyncDatabase(opArgs); @@ -66,6 +67,7 @@ public class CdcTools { private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("mysql-conf")); Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf"); Configuration mysqlConfig = Configuration.fromMap(mysqlMap); DatabaseSync databaseSync = new MysqlDatabaseSync(); @@ -74,6 +76,7 @@ public class CdcTools { private static void createOracleSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("oracle-conf")); Map<String, String> oracleMap = getConfigMap(params, "oracle-conf"); Configuration oracleConfig = Configuration.fromMap(oracleMap); DatabaseSync databaseSync = new OracleDatabaseSync(); @@ -82,6 +85,7 @@ public class CdcTools { private static void createPostgresSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("postgres-conf")); Map<String, String> postgresMap = getConfigMap(params, "postgres-conf"); Configuration postgresConfig = Configuration.fromMap(postgresMap); DatabaseSync databaseSync = new PostgresDatabaseSync(); @@ -90,6 +94,7 @@ public class CdcTools { private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("sqlserver-conf")); Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf"); Configuration postgresConfig = Configuration.fromMap(postgresMap); DatabaseSync databaseSync = new SqlServerDatabaseSync(); @@ -115,6 +120,7 @@ public class CdcTools { boolean useNewSchemaChange = params.has("use-new-schema-change"); boolean singleSink = params.has("single-sink"); + Preconditions.checkArgument(params.has("sink-conf")); Map<String, String> sinkMap = getConfigMap(params, "sink-conf"); Map<String, String> tableMap = getConfigMap(params, "table-conf"); Configuration sinkConfig = Configuration.fromMap(sinkMap); @@ -149,7 +155,13 @@ public class CdcTools { private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) { if (!params.has(key)) { - return new HashMap<>(); + System.out.println( + "Can not find key [" + + key + + "] from args: " + + params.toMap().toString() + + ".\n"); + return null; } Map<String, String> map = new HashMap<>(); @@ -163,7 +175,8 @@ public class CdcTools { continue; } - System.err.println("Invalid " + key + " " + param + ".\n"); + System.out.println("Invalid " + key + " " + param + ".\n"); + return null; } return map; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index f1e172b..b6bb5e5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -441,7 +442,9 @@ public abstract class DatabaseSync { } public DatabaseSync setTableConfig(Map<String, String> tableConfig) { - this.tableConfig = tableConfig; + if (!CollectionUtil.isNullOrEmpty(tableConfig)) { + this.tableConfig = tableConfig; + } return this; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java index e3478da..278be8c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java @@ -28,15 +28,20 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerLoggerFactory; -import java.net.MalformedURLException; +import java.net.InetAddress; import java.net.URL; import java.net.URLClassLoader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import java.util.stream.Stream; @@ -107,7 +112,7 @@ public abstract class DorisTestBase { return container; } - protected static void initializeJdbcConnection() throws SQLException, MalformedURLException { + protected static void initializeJdbcConnection() throws Exception { URLClassLoader urlClassLoader = new URLClassLoader( new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader()); @@ -124,6 +129,7 @@ public abstract class DorisTestBase { } while (!isBeReady(resultSet, Duration.ofSeconds(1L))); } LOG.info("Connected to Doris successfully..."); + printClusterStatus(); } private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { @@ -135,4 +141,28 @@ public abstract class DorisTestBase { } return false; } + + protected static void printClusterStatus() throws Exception { + LOG.info("Current machine IP: {}", InetAddress.getLocalHost()); + try (Statement statement = connection.createStatement()) { + ResultSet showFrontends = statement.executeQuery("show frontends"); + LOG.info("Frontends status: {}", convertList(showFrontends)); + ResultSet showBackends = statement.executeQuery("show backends"); + LOG.info("Backends status: {}", convertList(showBackends)); + } + } + + private static List<Map> convertList(ResultSet rs) throws SQLException { + List<Map> list = new ArrayList<>(); + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (rs.next()) { + Map<String, Object> rowData = new HashMap<>(); + for (int i = 1; i <= columnCount; i++) { + rowData.put(metaData.getColumnName(i), rs.getObject(i)); + } + list.add(rowData); + } + return list; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index 2f5568e..242f93f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -94,6 +94,7 @@ public class MySQLDorisE2ECase extends DorisTestBase { @Test public void testMySQL2Doris() throws Exception { + printClusterStatus(); initializeMySQLTable(); JobClient jobClient = submitJob(); // wait 2 times checkpoint @@ -173,6 +174,7 @@ public class MySQLDorisE2ECase extends DorisTestBase { @Test public void testAutoAddTable() throws Exception { + printClusterStatus(); initializeMySQLTable(); initializeDorisTable(); JobClient jobClient = submitJob(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org