This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new a2c823c [Improve]Optimize parameter configuration and ignore parameter case (#28) a2c823c is described below commit a2c823cbdcf77f92413166cbe6de4f237f83a657 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jun 12 09:56:14 2024 +0800 [Improve]Optimize parameter configuration and ignore parameter case (#28) --- .../doris/kafka/connector/DorisSinkConnector.java | 3 +- .../doris/kafka/connector/cfg/DorisOptions.java | 65 +++++------------- .../connector/cfg/DorisSinkConnectorConfig.java | 38 +++++++++-- .../kafka/connector/converter/ConverterMode.java | 4 ++ .../converter/schema/SchemaEvolutionMode.java | 4 ++ .../kafka/connector/utils/ConfigCheckUtils.java | 49 ++++++++++++++ .../kafka/connector/writer/DeliveryGuarantee.java | 4 ++ .../kafka/connector/writer/load/LoadModel.java | 4 ++ .../kafka/connector/cfg/TestDorisOptions.java | 1 + .../cfg/TestDorisSinkConnectorConfig.java | 77 ++++++++++++++++++++++ .../connector/converter/TestRecordService.java | 4 +- .../kafka/connector/writer/TestCopyIntoWriter.java | 2 + .../doris/kafka/connector/writer/TestCopyLoad.java | 2 + .../connector/writer/TestStreamLoadWriter.java | 2 + 14 files changed, 202 insertions(+), 57 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java index f6cf64b..bd1fe20 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkConnector.java @@ -52,9 +52,8 @@ public class DorisSinkConnector extends SinkConnector { @Override public void start(final Map<String, String> parsedConfig) { LOG.info("doris sink connector start"); - config = new HashMap<>(parsedConfig); + config = DorisSinkConnectorConfig.convertToLowercase(parsedConfig); DorisSinkConnectorConfig.setDefaultValues(config); - ConfigCheckUtils.validateConfig(config); } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 3db78ca..d5eaaf2 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -47,23 +47,23 @@ public class DorisOptions { private final Map<String, String> topicMap; private final int fileSize; private final int recordNum; - private long flushTime; - private boolean enableCustomJMX; + private final long flushTime; + private final boolean enableCustomJMX; private final int taskId; private final boolean enableDelete; - private boolean enable2PC; + private final boolean enable2PC; private boolean autoRedirect = true; private int requestReadTimeoutMs; private int requestConnectTimeoutMs; /** Properties for the StreamLoad. */ private final Properties streamLoadProp = new Properties(); - private String labelPrefix; - private String databaseTimeZone; - private LoadModel loadModel; - private DeliveryGuarantee deliveryGuarantee; - private ConverterMode converterMode; - private SchemaEvolutionMode schemaEvolutionMode; + @Deprecated private String labelPrefix; + private final String databaseTimeZone; + private final LoadModel loadModel; + private final DeliveryGuarantee deliveryGuarantee; + private final ConverterMode converterMode; + private final SchemaEvolutionMode schemaEvolutionMode; public DorisOptions(Map<String, String> config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -74,54 +74,25 @@ public class DorisOptions { this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD); this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE); this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID)); - this.databaseTimeZone = DorisSinkConnectorConfig.DATABASE_TIME_ZONE_DEFAULT; - if (config.containsKey(DorisSinkConnectorConfig.DATABASE_TIME_ZONE)) { - this.databaseTimeZone = config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE); - } - this.loadModel = - LoadModel.of( - config.getOrDefault( - DorisSinkConnectorConfig.LOAD_MODEL, - DorisSinkConnectorConfig.LOAD_MODEL_DEFAULT)); + this.databaseTimeZone = config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE); + this.loadModel = LoadModel.of(config.get(DorisSinkConnectorConfig.LOAD_MODEL)); this.deliveryGuarantee = - DeliveryGuarantee.of( - config.getOrDefault( - DorisSinkConnectorConfig.DELIVERY_GUARANTEE, - DorisSinkConnectorConfig.DELIVERY_GUARANTEE_DEFAULT)); - this.converterMode = - ConverterMode.of( - config.getOrDefault( - DorisSinkConnectorConfig.CONVERT_MODE, - DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT)); + DeliveryGuarantee.of(config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE)); + this.converterMode = ConverterMode.of(config.get(DorisSinkConnectorConfig.CONVERTER_MODE)); this.schemaEvolutionMode = SchemaEvolutionMode.of( - config.getOrDefault( - DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION, - DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT)); - + config.get(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION)); this.fileSize = Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES)); this.recordNum = Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS)); this.flushTime = Long.parseLong(config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC)); - if (flushTime < DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN) { - LOG.warn( - "flush time is {} seconds, it is smaller than the minimum flush time {} seconds, reset to the minimum flush time", - flushTime, - DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN); - this.flushTime = DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN; - } this.topicMap = getTopicToTableMap(config); - this.enable2PC = DorisSinkConnectorConfig.ENABLE_2PC_DEFAULT; - if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) { - this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC)); - } - enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT; - if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) { - enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT)); - } - enableDelete = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE)); + this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC)); + this.enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT)); + this.enableDelete = + Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE)); this.requestConnectTimeoutMs = DorisSinkConnectorConfig.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; this.requestReadTimeoutMs = DorisSinkConnectorConfig.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 02c24d0..a204947 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -20,6 +20,8 @@ package org.apache.doris.kafka.connector.cfg; import java.time.Duration; +import java.util.HashMap; +import java.util.Locale; import java.util.Map; import org.apache.doris.kafka.connector.DorisSinkConnector; import org.apache.doris.kafka.connector.converter.ConverterMode; @@ -77,7 +79,7 @@ public class DorisSinkConnectorConfig { public static final String AUTO_REDIRECT = "auto.redirect"; public static final String DELIVERY_GUARANTEE = "delivery.guarantee"; public static final String DELIVERY_GUARANTEE_DEFAULT = DeliveryGuarantee.AT_LEAST_ONCE.name(); - public static final String CONVERT_MODE = "converter.mode"; + public static final String CONVERTER_MODE = "converter.mode"; public static final String CONVERT_MODE_DEFAULT = ConverterMode.NORMAL.getName(); // Prefix for Doris StreamLoad specific properties. @@ -100,15 +102,37 @@ public class DorisSinkConnectorConfig { private static final ConfigDef.Validator topicToTableValidator = new TopicToTableValidator(); public static void setDefaultValues(Map<String, String> config) { - setFieldToDefaultValues(config, BUFFER_COUNT_RECORDS, BUFFER_COUNT_RECORDS_DEFAULT); - setFieldToDefaultValues(config, BUFFER_SIZE_BYTES, BUFFER_SIZE_BYTES_DEFAULT); - setFieldToDefaultValues(config, BUFFER_FLUSH_TIME_SEC, BUFFER_FLUSH_TIME_SEC_DEFAULT); + setFieldToDefaultValues( + config, BUFFER_COUNT_RECORDS, String.valueOf(BUFFER_COUNT_RECORDS_DEFAULT)); + setFieldToDefaultValues( + config, BUFFER_SIZE_BYTES, String.valueOf(BUFFER_SIZE_BYTES_DEFAULT)); + setFieldToDefaultValues( + config, BUFFER_FLUSH_TIME_SEC, String.valueOf(BUFFER_FLUSH_TIME_SEC_DEFAULT)); + setFieldToDefaultValues(config, DATABASE_TIME_ZONE, DATABASE_TIME_ZONE_DEFAULT); + setFieldToDefaultValues(config, LOAD_MODEL, LOAD_MODEL_DEFAULT); + setFieldToDefaultValues(config, DELIVERY_GUARANTEE, DELIVERY_GUARANTEE_DEFAULT); + setFieldToDefaultValues(config, CONVERTER_MODE, CONVERT_MODE_DEFAULT); + setFieldToDefaultValues( + config, DEBEZIUM_SCHEMA_EVOLUTION, DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT); + setFieldToDefaultValues(config, ENABLE_2PC, String.valueOf(ENABLE_2PC_DEFAULT)); + setFieldToDefaultValues(config, JMX_OPT, String.valueOf(JMX_OPT_DEFAULT)); } - static void setFieldToDefaultValues(Map<String, String> config, String field, Long value) { + public static Map<String, String> convertToLowercase(Map<String, String> config) { + Map<String, String> newConfig = new HashMap<>(); + for (Map.Entry<String, String> configEntry : config.entrySet()) { + String key = configEntry.getKey(); + String value = configEntry.getValue(); + newConfig.put(key.toLowerCase(Locale.ROOT), value); + } + return newConfig; + } + + private static void setFieldToDefaultValues( + Map<String, String> config, String field, String value) { if (!config.containsKey(field)) { - config.put(field, value + ""); - LOG.info("{} set to default {} seconds", field, value); + config.put(field, value); + LOG.info("Set the default value of {} to {}", field, value); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java index c40f789..95f40f3 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/ConverterMode.java @@ -38,4 +38,8 @@ public enum ConverterMode { public String getName() { return name; } + + public static String[] instances() { + return new String[] {NORMAL.name, DEBEZIUM_INGESTION.name}; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java index d9b6a9b..e6d77b2 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java @@ -37,4 +37,8 @@ public enum SchemaEvolutionMode { public String getName() { return name; } + + public static String[] instances() { + return new String[] {NONE.name, BASIC.name}; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index 2356f7d..3a6e583 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -23,8 +23,12 @@ import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; +import org.apache.doris.kafka.connector.converter.ConverterMode; +import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; import org.apache.doris.kafka.connector.exception.ArgumentsException; import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; +import org.apache.doris.kafka.connector.writer.load.LoadModel; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; import org.slf4j.Logger; @@ -139,6 +143,42 @@ public class ConfigCheckUtils { configIsValid = false; } + String loadModel = config.get(DorisSinkConnectorConfig.LOAD_MODEL); + if (!validateEnumInstances(loadModel, LoadModel.instances())) { + LOG.error( + "The value of {} is an illegal parameter of {}.", + loadModel, + DorisSinkConnectorConfig.LOAD_MODEL); + configIsValid = false; + } + + String deliveryGuarantee = config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE); + if (!validateEnumInstances(deliveryGuarantee, DeliveryGuarantee.instances())) { + LOG.error( + "The value of {} is an illegal parameter of {}.", + loadModel, + DorisSinkConnectorConfig.DELIVERY_GUARANTEE); + configIsValid = false; + } + + String converterMode = config.get(DorisSinkConnectorConfig.CONVERTER_MODE); + if (!validateEnumInstances(converterMode, ConverterMode.instances())) { + LOG.error( + "The value of {} is an illegal parameter of {}.", + loadModel, + DorisSinkConnectorConfig.CONVERTER_MODE); + configIsValid = false; + } + + String schemaEvolutionMode = config.get(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION); + if (!validateEnumInstances(schemaEvolutionMode, SchemaEvolutionMode.instances())) { + LOG.error( + "The value of {} is an illegal parameter of {}.", + loadModel, + DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION); + configIsValid = false; + } + if (!configIsValid) { throw new DorisException( "input kafka connector configuration is null, missing required values, or wrong input value"); @@ -254,4 +294,13 @@ public class ConfigCheckUtils { private static boolean isValidTableIdentifier(String tblName) { return tblName.matches("^[a-zA-Z][a-zA-Z0-9_]*$"); } + + private static boolean validateEnumInstances(String value, String[] instances) { + for (String instance : instances) { + if (instance.equalsIgnoreCase(value)) { + return true; + } + } + return false; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java b/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java index bea3cd2..f00c61f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/DeliveryGuarantee.java @@ -37,4 +37,8 @@ public enum DeliveryGuarantee { public String getName() { return name; } + + public static String[] instances() { + return new String[] {EXACTLY_ONCE.name, AT_LEAST_ONCE.name}; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java index 4de2115..1b2cbc3 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/LoadModel.java @@ -37,4 +37,8 @@ public enum LoadModel { public static LoadModel of(String name) { return LoadModel.valueOf(name.toUpperCase()); } + + public static String[] instances() { + return new String[] {STREAM_LOAD.name, COPY_INTO.name}; + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java index 4b18372..20fe400 100644 --- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java +++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisOptions.java @@ -45,6 +45,7 @@ public class TestDorisOptions { props = new Properties(); props.load(stream); props.put("task_id", "1"); + DorisSinkConnectorConfig.setDefaultValues((Map) props); } @Test diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java index c09a3d9..eae3ec0 100644 --- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java +++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java @@ -19,10 +19,13 @@ package org.apache.doris.kafka.connector.cfg; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.doris.kafka.connector.exception.DorisException; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; +import org.junit.Assert; import org.junit.Test; public class TestDorisSinkConnectorConfig { @@ -48,6 +51,7 @@ public class TestDorisSinkConnectorConfig { config.put( DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_DEFAULT + ""); + DorisSinkConnectorConfig.setDefaultValues(config); return config; } @@ -213,4 +217,77 @@ public class TestDorisSinkConnectorConfig { config.put(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS, "11adssadsa"); ConfigCheckUtils.validateConfig(config); } + + @Test(expected = DorisException.class) + public void testLoadModelException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.LOAD_MODEL, "stream_loada"); + ConfigCheckUtils.validateConfig(config); + } + + @Test + public void testLoadModel() { + Map<String, String> config = getConfig(); + ConfigCheckUtils.validateConfig(config); + } + + @Test + public void testDeliveryGuarantee() { + Map<String, String> config = getConfig(); + ConfigCheckUtils.validateConfig(config); + } + + @Test(expected = DorisException.class) + public void testDeliveryGuaranteeException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.DELIVERY_GUARANTEE, "exactly_oncea"); + ConfigCheckUtils.validateConfig(config); + } + + @Test + public void testConverterMode() { + Map<String, String> config = getConfig(); + ConfigCheckUtils.validateConfig(config); + } + + @Test(expected = DorisException.class) + public void testConverterModeException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.CONVERTER_MODE, "debezium_ingestiona"); + ConfigCheckUtils.validateConfig(config); + } + + @Test + public void testSchemaEvolutionMode() { + Map<String, String> config = getConfig(); + ConfigCheckUtils.validateConfig(config); + } + + @Test(expected = DorisException.class) + public void testSchemaEvolutionModeException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION, "nonea"); + ConfigCheckUtils.validateConfig(config); + } + + @Test + public void testConvertToLowercase() { + Map<String, String> config = getConfig(); + config.put("DELIVERY.guarantee", "at_least_once"); + config.put("load.MODEL", "STREAM_LOAD"); + config.put("DORIS.USER", "root"); + config.put("Enable.deLete", "true"); + config.put("doris.http.port", "8030"); + Map<String, String> convertConfig = DorisSinkConnectorConfig.convertToLowercase(config); + List<String> result = + Arrays.asList( + DorisSinkConnectorConfig.DELIVERY_GUARANTEE, + DorisSinkConnectorConfig.LOAD_MODEL, + DorisSinkConnectorConfig.DORIS_USER, + DorisSinkConnectorConfig.ENABLE_DELETE, + DorisSinkConnectorConfig.DORIS_HTTP_PORT); + for (String s : result) { + Assert.assertTrue(convertConfig.containsKey(s)); + } + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index 74af7c9..08e76ba 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager; import org.apache.doris.kafka.connector.exception.DorisException; import org.apache.doris.kafka.connector.model.doris.Schema; @@ -70,6 +71,7 @@ public class TestRecordService { .getClassLoader() .getResourceAsStream("doris-connector-sink.properties"); props.load(stream); + DorisSinkConnectorConfig.setDefaultValues((Map) props); props.put("task_id", "1"); props.put("converter.mode", "debezium_ingestion"); props.put("debezium.schema.evolution", "basic"); @@ -176,7 +178,7 @@ public class TestRecordService { @Test public void processStructRecord() throws IOException { - props.remove("converter.mode"); + props.put("converter.mode", "normal"); recordService = new RecordService(new DorisOptions((Map) props)); String topic = "normal.wdl_test.test_sink_normal"; diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java index 2f06c93..302b9ea 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; import org.apache.doris.kafka.connector.exception.CopyLoadException; import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; @@ -59,6 +60,7 @@ public class TestCopyIntoWriter { .getResourceAsStream("doris-connector-sink.properties"); Properties props = new Properties(); props.load(stream); + DorisSinkConnectorConfig.setDefaultValues((Map) props); props.put("task_id", "1"); props.put("name", "connect-test-sink299"); props.put("load.model", "copy_into"); diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java index 7ab29e0..bb92085 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyLoad.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.writer.load.CopyLoad; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; @@ -49,6 +50,7 @@ public class TestCopyLoad { Properties props = new Properties(); props.load(stream); props.put("task_id", "1"); + DorisSinkConnectorConfig.setDefaultValues((Map) props); options = new DorisOptions((Map) props); } diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java index 6f09bf1..7e44a2d 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; import org.apache.doris.kafka.connector.exception.StreamLoadException; import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; @@ -59,6 +60,7 @@ public class TestStreamLoadWriter { .getResourceAsStream("doris-connector-sink.properties"); Properties props = new Properties(); props.load(stream); + DorisSinkConnectorConfig.setDefaultValues((Map) props); props.put("task_id", "1"); props.put("name", "sink-connector-test"); dorisOptions = new DorisOptions((Map) props); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org