This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a2c823c  [Improve]Optimize parameter configuration and ignore 
parameter case (#28)
a2c823c is described below

commit a2c823cbdcf77f92413166cbe6de4f237f83a657
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed Jun 12 09:56:14 2024 +0800

    [Improve]Optimize parameter configuration and ignore parameter case (#28)
---
 .../doris/kafka/connector/DorisSinkConnector.java  |  3 +-
 .../doris/kafka/connector/cfg/DorisOptions.java    | 65 +++++-------------
 .../connector/cfg/DorisSinkConnectorConfig.java    | 38 +++++++++--
 .../kafka/connector/converter/ConverterMode.java   |  4 ++
 .../converter/schema/SchemaEvolutionMode.java      |  4 ++
 .../kafka/connector/utils/ConfigCheckUtils.java    | 49 ++++++++++++++
 .../kafka/connector/writer/DeliveryGuarantee.java  |  4 ++
 .../kafka/connector/writer/load/LoadModel.java     |  4 ++
 .../kafka/connector/cfg/TestDorisOptions.java      |  1 +
 .../cfg/TestDorisSinkConnectorConfig.java          | 77 ++++++++++++++++++++++
 .../connector/converter/TestRecordService.java     |  4 +-
 .../kafka/connector/writer/TestCopyIntoWriter.java |  2 +
 .../doris/kafka/connector/writer/TestCopyLoad.java |  2 +
 .../connector/writer/TestStreamLoadWriter.java     |  2 +
 14 files changed, 202 insertions(+), 57 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
index f6cf64b..bd1fe20 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java
@@ -52,9 +52,8 @@ public class DorisSinkConnector extends SinkConnector {
     @Override
     public void start(final Map<String, String> parsedConfig) {
         LOG.info("doris sink connector start");
-        config = new HashMap<>(parsedConfig);
+        config = DorisSinkConnectorConfig.convertToLowercase(parsedConfig);
         DorisSinkConnectorConfig.setDefaultValues(config);
-
         ConfigCheckUtils.validateConfig(config);
     }
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 3db78ca..d5eaaf2 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -47,23 +47,23 @@ public class DorisOptions {
     private final Map<String, String> topicMap;
     private final int fileSize;
     private final int recordNum;
-    private long flushTime;
-    private boolean enableCustomJMX;
+    private final long flushTime;
+    private final boolean enableCustomJMX;
     private final int taskId;
     private final boolean enableDelete;
-    private boolean enable2PC;
+    private final boolean enable2PC;
     private boolean autoRedirect = true;
     private int requestReadTimeoutMs;
     private int requestConnectTimeoutMs;
     /** Properties for the StreamLoad. */
     private final Properties streamLoadProp = new Properties();
 
-    private String labelPrefix;
-    private String databaseTimeZone;
-    private LoadModel loadModel;
-    private DeliveryGuarantee deliveryGuarantee;
-    private ConverterMode converterMode;
-    private SchemaEvolutionMode schemaEvolutionMode;
+    @Deprecated private String labelPrefix;
+    private final String databaseTimeZone;
+    private final LoadModel loadModel;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final ConverterMode converterMode;
+    private final SchemaEvolutionMode schemaEvolutionMode;
 
     public DorisOptions(Map<String, String> config) {
         this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -74,54 +74,25 @@ public class DorisOptions {
         this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD);
         this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE);
         this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID));
-        this.databaseTimeZone = 
DorisSinkConnectorConfig.DATABASE_TIME_ZONE_DEFAULT;
-        if (config.containsKey(DorisSinkConnectorConfig.DATABASE_TIME_ZONE)) {
-            this.databaseTimeZone = 
config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE);
-        }
-        this.loadModel =
-                LoadModel.of(
-                        config.getOrDefault(
-                                DorisSinkConnectorConfig.LOAD_MODEL,
-                                DorisSinkConnectorConfig.LOAD_MODEL_DEFAULT));
+        this.databaseTimeZone = 
config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE);
+        this.loadModel = 
LoadModel.of(config.get(DorisSinkConnectorConfig.LOAD_MODEL));
         this.deliveryGuarantee =
-                DeliveryGuarantee.of(
-                        config.getOrDefault(
-                                DorisSinkConnectorConfig.DELIVERY_GUARANTEE,
-                                
DorisSinkConnectorConfig.DELIVERY_GUARANTEE_DEFAULT));
-        this.converterMode =
-                ConverterMode.of(
-                        config.getOrDefault(
-                                DorisSinkConnectorConfig.CONVERT_MODE,
-                                
DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT));
+                
DeliveryGuarantee.of(config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE));
+        this.converterMode = 
ConverterMode.of(config.get(DorisSinkConnectorConfig.CONVERTER_MODE));
         this.schemaEvolutionMode =
                 SchemaEvolutionMode.of(
-                        config.getOrDefault(
-                                
DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION,
-                                
DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT));
-
+                        
config.get(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION));
         this.fileSize = 
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES));
         this.recordNum =
                 
Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS));
 
         this.flushTime = 
Long.parseLong(config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC));
-        if (flushTime < DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN) {
-            LOG.warn(
-                    "flush time is {} seconds, it is smaller than the minimum 
flush time {} seconds, reset to the minimum flush time",
-                    flushTime,
-                    DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN);
-            this.flushTime = 
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN;
-        }
         this.topicMap = getTopicToTableMap(config);
 
-        this.enable2PC = DorisSinkConnectorConfig.ENABLE_2PC_DEFAULT;
-        if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
-            this.enable2PC = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
-        }
-        enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT;
-        if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) {
-            enableCustomJMX = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
-        }
-        enableDelete = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE));
+        this.enable2PC = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
+        this.enableCustomJMX = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
+        this.enableDelete =
+                
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE));
         this.requestConnectTimeoutMs =
                 
DorisSinkConnectorConfig.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
         this.requestReadTimeoutMs = 
DorisSinkConnectorConfig.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 02c24d0..a204947 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -20,6 +20,8 @@
 package org.apache.doris.kafka.connector.cfg;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import org.apache.doris.kafka.connector.DorisSinkConnector;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
@@ -77,7 +79,7 @@ public class DorisSinkConnectorConfig {
     public static final String AUTO_REDIRECT = "auto.redirect";
     public static final String DELIVERY_GUARANTEE = "delivery.guarantee";
     public static final String DELIVERY_GUARANTEE_DEFAULT = 
DeliveryGuarantee.AT_LEAST_ONCE.name();
-    public static final String CONVERT_MODE = "converter.mode";
+    public static final String CONVERTER_MODE = "converter.mode";
     public static final String CONVERT_MODE_DEFAULT = 
ConverterMode.NORMAL.getName();
 
     // Prefix for Doris StreamLoad specific properties.
@@ -100,15 +102,37 @@ public class DorisSinkConnectorConfig {
     private static final ConfigDef.Validator topicToTableValidator = new 
TopicToTableValidator();
 
     public static void setDefaultValues(Map<String, String> config) {
-        setFieldToDefaultValues(config, BUFFER_COUNT_RECORDS, 
BUFFER_COUNT_RECORDS_DEFAULT);
-        setFieldToDefaultValues(config, BUFFER_SIZE_BYTES, 
BUFFER_SIZE_BYTES_DEFAULT);
-        setFieldToDefaultValues(config, BUFFER_FLUSH_TIME_SEC, 
BUFFER_FLUSH_TIME_SEC_DEFAULT);
+        setFieldToDefaultValues(
+                config, BUFFER_COUNT_RECORDS, 
String.valueOf(BUFFER_COUNT_RECORDS_DEFAULT));
+        setFieldToDefaultValues(
+                config, BUFFER_SIZE_BYTES, 
String.valueOf(BUFFER_SIZE_BYTES_DEFAULT));
+        setFieldToDefaultValues(
+                config, BUFFER_FLUSH_TIME_SEC, 
String.valueOf(BUFFER_FLUSH_TIME_SEC_DEFAULT));
+        setFieldToDefaultValues(config, DATABASE_TIME_ZONE, 
DATABASE_TIME_ZONE_DEFAULT);
+        setFieldToDefaultValues(config, LOAD_MODEL, LOAD_MODEL_DEFAULT);
+        setFieldToDefaultValues(config, DELIVERY_GUARANTEE, 
DELIVERY_GUARANTEE_DEFAULT);
+        setFieldToDefaultValues(config, CONVERTER_MODE, CONVERT_MODE_DEFAULT);
+        setFieldToDefaultValues(
+                config, DEBEZIUM_SCHEMA_EVOLUTION, 
DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT);
+        setFieldToDefaultValues(config, ENABLE_2PC, 
String.valueOf(ENABLE_2PC_DEFAULT));
+        setFieldToDefaultValues(config, JMX_OPT, 
String.valueOf(JMX_OPT_DEFAULT));
     }
 
-    static void setFieldToDefaultValues(Map<String, String> config, String 
field, Long value) {
+    public static Map<String, String> convertToLowercase(Map<String, String> 
config) {
+        Map<String, String> newConfig = new HashMap<>();
+        for (Map.Entry<String, String> configEntry : config.entrySet()) {
+            String key = configEntry.getKey();
+            String value = configEntry.getValue();
+            newConfig.put(key.toLowerCase(Locale.ROOT), value);
+        }
+        return newConfig;
+    }
+
+    private static void setFieldToDefaultValues(
+            Map<String, String> config, String field, String value) {
         if (!config.containsKey(field)) {
-            config.put(field, value + "");
-            LOG.info("{} set to default {} seconds", field, value);
+            config.put(field, value);
+            LOG.info("Set the default value of {} to {}", field, value);
         }
     }
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java
index c40f789..95f40f3 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java
@@ -38,4 +38,8 @@ public enum ConverterMode {
     public String getName() {
         return name;
     }
+
+    public static String[] instances() {
+        return new String[] {NORMAL.name, DEBEZIUM_INGESTION.name};
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
index d9b6a9b..e6d77b2 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java
@@ -37,4 +37,8 @@ public enum SchemaEvolutionMode {
     public String getName() {
         return name;
     }
+
+    public static String[] instances() {
+        return new String[] {NONE.name, BASIC.name};
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 2356f7d..3a6e583 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -23,8 +23,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.converter.ConverterMode;
+import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
 import org.apache.doris.kafka.connector.exception.ArgumentsException;
 import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
+import org.apache.doris.kafka.connector.writer.load.LoadModel;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigValue;
 import org.slf4j.Logger;
@@ -139,6 +143,42 @@ public class ConfigCheckUtils {
             configIsValid = false;
         }
 
+        String loadModel = config.get(DorisSinkConnectorConfig.LOAD_MODEL);
+        if (!validateEnumInstances(loadModel, LoadModel.instances())) {
+            LOG.error(
+                    "The value of {} is an illegal parameter of {}.",
+                    loadModel,
+                    DorisSinkConnectorConfig.LOAD_MODEL);
+            configIsValid = false;
+        }
+
+        String deliveryGuarantee = 
config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE);
+        if (!validateEnumInstances(deliveryGuarantee, 
DeliveryGuarantee.instances())) {
+            LOG.error(
+                    "The value of {} is an illegal parameter of {}.",
+                    loadModel,
+                    DorisSinkConnectorConfig.DELIVERY_GUARANTEE);
+            configIsValid = false;
+        }
+
+        String converterMode = 
config.get(DorisSinkConnectorConfig.CONVERTER_MODE);
+        if (!validateEnumInstances(converterMode, ConverterMode.instances())) {
+            LOG.error(
+                    "The value of {} is an illegal parameter of {}.",
+                    loadModel,
+                    DorisSinkConnectorConfig.CONVERTER_MODE);
+            configIsValid = false;
+        }
+
+        String schemaEvolutionMode = 
config.get(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION);
+        if (!validateEnumInstances(schemaEvolutionMode, 
SchemaEvolutionMode.instances())) {
+            LOG.error(
+                    "The value of {} is an illegal parameter of {}.",
+                    loadModel,
+                    DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION);
+            configIsValid = false;
+        }
+
         if (!configIsValid) {
             throw new DorisException(
                     "input kafka connector configuration is null, missing 
required values, or wrong input value");
@@ -254,4 +294,13 @@ public class ConfigCheckUtils {
     private static boolean isValidTableIdentifier(String tblName) {
         return tblName.matches("^[a-zA-Z][a-zA-Z0-9_]*$");
     }
+
+    private static boolean validateEnumInstances(String value, String[] 
instances) {
+        for (String instance : instances) {
+            if (instance.equalsIgnoreCase(value)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java
index bea3cd2..f00c61f 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java
@@ -37,4 +37,8 @@ public enum DeliveryGuarantee {
     public String getName() {
         return name;
     }
+
+    public static String[] instances() {
+        return new String[] {EXACTLY_ONCE.name, AT_LEAST_ONCE.name};
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java
index 4de2115..1b2cbc3 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java
@@ -37,4 +37,8 @@ public enum LoadModel {
     public static LoadModel of(String name) {
         return LoadModel.valueOf(name.toUpperCase());
     }
+
+    public static String[] instances() {
+        return new String[] {STREAM_LOAD.name, COPY_INTO.name};
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java
index 4b18372..20fe400 100644
--- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java
+++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java
@@ -45,6 +45,7 @@ public class TestDorisOptions {
         props = new Properties();
         props.load(stream);
         props.put("task_id", "1");
+        DorisSinkConnectorConfig.setDefaultValues((Map) props);
     }
 
     @Test
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index c09a3d9..eae3ec0 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -19,10 +19,13 @@
 
 package org.apache.doris.kafka.connector.cfg;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.doris.kafka.connector.exception.DorisException;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestDorisSinkConnectorConfig {
@@ -48,6 +51,7 @@ public class TestDorisSinkConnectorConfig {
         config.put(
                 DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC,
                 DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_DEFAULT + "");
+        DorisSinkConnectorConfig.setDefaultValues(config);
         return config;
     }
 
@@ -213,4 +217,77 @@ public class TestDorisSinkConnectorConfig {
         config.put(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS, 
"11adssadsa");
         ConfigCheckUtils.validateConfig(config);
     }
+
+    @Test(expected = DorisException.class)
+    public void testLoadModelException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.LOAD_MODEL, "stream_loada");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test
+    public void testLoadModel() {
+        Map<String, String> config = getConfig();
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test
+    public void testDeliveryGuarantee() {
+        Map<String, String> config = getConfig();
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test(expected = DorisException.class)
+    public void testDeliveryGuaranteeException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.DELIVERY_GUARANTEE, 
"exactly_oncea");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test
+    public void testConverterMode() {
+        Map<String, String> config = getConfig();
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test(expected = DorisException.class)
+    public void testConverterModeException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.CONVERTER_MODE, 
"debezium_ingestiona");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test
+    public void testSchemaEvolutionMode() {
+        Map<String, String> config = getConfig();
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test(expected = DorisException.class)
+    public void testSchemaEvolutionModeException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION, 
"nonea");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test
+    public void testConvertToLowercase() {
+        Map<String, String> config = getConfig();
+        config.put("DELIVERY.guarantee", "at_least_once");
+        config.put("load.MODEL", "STREAM_LOAD");
+        config.put("DORIS.USER", "root");
+        config.put("Enable.deLete", "true");
+        config.put("doris.http.port", "8030");
+        Map<String, String> convertConfig = 
DorisSinkConnectorConfig.convertToLowercase(config);
+        List<String> result =
+                Arrays.asList(
+                        DorisSinkConnectorConfig.DELIVERY_GUARANTEE,
+                        DorisSinkConnectorConfig.LOAD_MODEL,
+                        DorisSinkConnectorConfig.DORIS_USER,
+                        DorisSinkConnectorConfig.ENABLE_DELETE,
+                        DorisSinkConnectorConfig.DORIS_HTTP_PORT);
+        for (String s : result) {
+            Assert.assertTrue(convertConfig.containsKey(s));
+        }
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 74af7c9..08e76ba 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager;
 import org.apache.doris.kafka.connector.exception.DorisException;
 import org.apache.doris.kafka.connector.model.doris.Schema;
@@ -70,6 +71,7 @@ public class TestRecordService {
                         .getClassLoader()
                         
.getResourceAsStream("doris-connector-sink.properties");
         props.load(stream);
+        DorisSinkConnectorConfig.setDefaultValues((Map) props);
         props.put("task_id", "1");
         props.put("converter.mode", "debezium_ingestion");
         props.put("debezium.schema.evolution", "basic");
@@ -176,7 +178,7 @@ public class TestRecordService {
 
     @Test
     public void processStructRecord() throws IOException {
-        props.remove("converter.mode");
+        props.put("converter.mode", "normal");
         recordService = new RecordService(new DorisOptions((Map) props));
         String topic = "normal.wdl_test.test_sink_normal";
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
index 2f06c93..302b9ea 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
 import org.apache.doris.kafka.connector.exception.CopyLoadException;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
@@ -59,6 +60,7 @@ public class TestCopyIntoWriter {
                         
.getResourceAsStream("doris-connector-sink.properties");
         Properties props = new Properties();
         props.load(stream);
+        DorisSinkConnectorConfig.setDefaultValues((Map) props);
         props.put("task_id", "1");
         props.put("name", "connect-test-sink299");
         props.put("load.model", "copy_into");
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java
index 7ab29e0..bb92085 100644
--- a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java
+++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.writer.load.CopyLoad;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -49,6 +50,7 @@ public class TestCopyLoad {
         Properties props = new Properties();
         props.load(stream);
         props.put("task_id", "1");
+        DorisSinkConnectorConfig.setDefaultValues((Map) props);
         options = new DorisOptions((Map) props);
     }
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index 6f09bf1..7e44a2d 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
 import org.apache.doris.kafka.connector.exception.StreamLoadException;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
@@ -59,6 +60,7 @@ public class TestStreamLoadWriter {
                         
.getResourceAsStream("doris-connector-sink.properties");
         Properties props = new Properties();
         props.load(stream);
+        DorisSinkConnectorConfig.setDefaultValues((Map) props);
         props.put("task_id", "1");
         props.put("name", "sink-connector-test");
         dorisOptions = new DorisOptions((Map) props);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to