This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a00a3c2  [fix] fix the check of 'topic2table' configuration (#83)
a00a3c2 is described below

commit a00a3c24c56a739faeed4d143bb94aba22324b21
Author: wangchuang <[email protected]>
AuthorDate: Wed Aug 6 10:46:25 2025 +0800

    [fix] fix the check of 'topic2table' configuration (#83)
---
 .../org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java  | 7 +++++--
 .../doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java   | 8 ++++++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index e1588c3..2a24f3f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -83,8 +83,8 @@ public class ConfigCheckUtils {
             configIsValid = false;
         }
 
-        if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
-                && 
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
+        if (!config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
+                || 
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
                         == null) {
             LOG.error("{} is empty or invalid.", 
DorisSinkConnectorConfig.TOPICS_TABLES_MAP);
             configIsValid = false;
@@ -289,6 +289,9 @@ public class ConfigCheckUtils {
     }
 
     public static Map<String, String> parseTopicToTableMap(String input) {
+        if (input == null) {
+            return null;
+        }
         Map<String, String> topic2Table = new HashMap<>();
         boolean isInvalid = false;
         for (String str : input.split(",")) {
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index e6633ec..b25c789 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -45,6 +45,7 @@ public class TestDorisSinkConnectorConfig {
         config.put(DorisSinkConnectorConfig.DORIS_PASSWORD, "password");
 
         config.put(DorisSinkConnectorConfig.DORIS_DATABASE, "testDatabase");
+        config.put(DorisSinkConnectorConfig.TOPICS_TABLES_MAP, 
"topic1:table1,topic2:table2");
         config.put(
                 DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS,
                 DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT + "");
@@ -64,6 +65,13 @@ public class TestDorisSinkConnectorConfig {
         ConfigCheckUtils.validateConfig(config);
     }
 
+    @Test(expected = DorisException.class)
+    public void testEmptyTopic2MapConfig() {
+        Map<String, String> config = getConfig();
+        config.remove(DorisSinkConnectorConfig.TOPICS_TABLES_MAP);
+        ConfigCheckUtils.validateConfig(config);
+    }
+
     @Test(expected = DorisException.class)
     public void testEmptyFlushTime() {
         Map<String, String> config = getConfig();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to