This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a00a3c2 [fix] fix the check of 'topic2table' configuration (#83)
a00a3c2 is described below
commit a00a3c24c56a739faeed4d143bb94aba22324b21
Author: wangchuang <[email protected]>
AuthorDate: Wed Aug 6 10:46:25 2025 +0800
[fix] fix the check of 'topic2table' configuration (#83)
---
.../org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java | 7 +++++--
.../doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java | 8 ++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index e1588c3..2a24f3f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -83,8 +83,8 @@ public class ConfigCheckUtils {
configIsValid = false;
}
- if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
- &&
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
+ if (!config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
+ ||
parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
== null) {
LOG.error("{} is empty or invalid.",
DorisSinkConnectorConfig.TOPICS_TABLES_MAP);
configIsValid = false;
@@ -289,6 +289,9 @@ public class ConfigCheckUtils {
}
public static Map<String, String> parseTopicToTableMap(String input) {
+ if (input == null) {
+ return null;
+ }
Map<String, String> topic2Table = new HashMap<>();
boolean isInvalid = false;
for (String str : input.split(",")) {
diff --git
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index e6633ec..b25c789 100644
---
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -45,6 +45,7 @@ public class TestDorisSinkConnectorConfig {
config.put(DorisSinkConnectorConfig.DORIS_PASSWORD, "password");
config.put(DorisSinkConnectorConfig.DORIS_DATABASE, "testDatabase");
+ config.put(DorisSinkConnectorConfig.TOPICS_TABLES_MAP,
"topic1:table1,topic2:table2");
config.put(
DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS,
DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT + "");
@@ -64,6 +65,13 @@ public class TestDorisSinkConnectorConfig {
ConfigCheckUtils.validateConfig(config);
}
+ @Test(expected = DorisException.class)
+ public void testEmptyTopic2MapConfig() {
+ Map<String, String> config = getConfig();
+ config.remove(DorisSinkConnectorConfig.TOPICS_TABLES_MAP);
+ ConfigCheckUtils.validateConfig(config);
+ }
+
@Test(expected = DorisException.class)
public void testEmptyFlushTime() {
Map<String, String> config = getConfig();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]