This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 56b4668  [Improve] add behavior on null values (#69)
56b4668 is described below

commit 56b4668696b3c33d044ecd7cc90e9b79dab18190
Author: wangchuang <59386838+chuang-wang-...@users.noreply.github.com>
AuthorDate: Tue Apr 29 10:50:13 2025 +0800

    [Improve] add behavior on null values (#69)
---
 .../doris/kafka/connector/cfg/DorisOptions.java    |  9 +++
 .../connector/cfg/DorisSinkConnectorConfig.java    | 13 ++++-
 .../connector/model/BehaviorOnNullValues.java      | 40 +++++++++++++
 .../connector/service/DorisDefaultSinkService.java | 39 ++++++++++---
 .../kafka/connector/utils/ConfigCheckUtils.java    | 10 ++++
 .../cfg/TestDorisSinkConnectorConfig.java          |  7 +++
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 ++++++++++++++++++
 .../connector/service/TestDorisSinkService.java    | 67 +++++++++++++++++++++-
 .../e2e/string_converter/null_values_default.json  | 23 ++++++++
 .../e2e/string_converter/null_values_fail.json     | 24 ++++++++
 .../e2e/string_converter/null_values_ignore.json   | 24 ++++++++
 .../e2e/string_converter/null_values_tab.sql       | 12 ++++
 12 files changed, 314 insertions(+), 9 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index c1a2cbd..922eee6 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -28,6 +28,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
 import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -68,6 +69,7 @@ public class DorisOptions {
     private final SchemaEvolutionMode schemaEvolutionMode;
     private final int maxRetries;
     private final int retryIntervalMs;
+    private final BehaviorOnNullValues behaviorOnNullValues;
 
     public DorisOptions(Map<String, String> config) {
         this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -140,6 +142,9 @@ public class DorisOptions {
                                 DorisSinkConnectorConfig.RETRY_INTERVAL_MS,
                                 String.valueOf(
                                         
DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT)));
+        this.behaviorOnNullValues =
+                BehaviorOnNullValues.of(
+                        
config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES));
     }
 
     private Properties getStreamLoadPropFromConfig(Map<String, String> config) 
{
@@ -341,4 +346,8 @@ public class DorisOptions {
     public int getRetryIntervalMs() {
         return retryIntervalMs;
     }
+
+    public BehaviorOnNullValues getBehaviorOnNullValues() {
+        return behaviorOnNullValues;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 53442ca..5b8807e 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.doris.kafka.connector.DorisSinkConnector;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
 import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -95,6 +96,9 @@ public class DorisSinkConnectorConfig {
     public static final String RETRY_INTERVAL_MS = "retry.interval.ms";
     public static final int RETRY_INTERVAL_MS_DEFAULT = 6000;
 
+    public static final String BEHAVIOR_ON_NULL_VALUES = 
"behavior.on.null.values";
+    public static final String BEHAVIOR_ON_NULL_VALUES_DEFAULT = 
BehaviorOnNullValues.IGNORE.name();
+
     // metrics
     public static final String JMX_OPT = "jmx";
     public static final boolean JMX_OPT_DEFAULT = true;
@@ -125,6 +129,7 @@ public class DorisSinkConnectorConfig {
         setFieldToDefaultValues(config, MAX_RETRIES, 
String.valueOf(MAX_RETRIES_DEFAULT));
         setFieldToDefaultValues(
                 config, RETRY_INTERVAL_MS, 
String.valueOf(RETRY_INTERVAL_MS_DEFAULT));
+        setFieldToDefaultValues(config, BEHAVIOR_ON_NULL_VALUES, 
BEHAVIOR_ON_NULL_VALUES_DEFAULT);
     }
 
     public static Map<String, String> convertToLowercase(Map<String, String> 
config) {
@@ -291,7 +296,13 @@ public class DorisSinkConnectorConfig {
                         Type.INT,
                         RETRY_INTERVAL_MS_DEFAULT,
                         Importance.MEDIUM,
-                        "The time in milliseconds to wait following an error 
before a retry attempt is made.");
+                        "The time in milliseconds to wait following an error 
before a retry attempt is made.")
+                .define(
+                        BEHAVIOR_ON_NULL_VALUES,
+                        Type.STRING,
+                        BEHAVIOR_ON_NULL_VALUES_DEFAULT,
+                        Importance.LOW,
+                        "Used to handle records with a null value .");
     }
 
     public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java
 
b/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java
new file mode 100644
index 0000000..4398c6d
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model;
+
+public enum BehaviorOnNullValues {
+    IGNORE("ignore"),
+
+    FAIL("fail");
+
+    private String name;
+
+    BehaviorOnNullValues(String name) {
+        this.name = name;
+    }
+
+    public static BehaviorOnNullValues of(String name) {
+        return BehaviorOnNullValues.valueOf(name.toUpperCase());
+    }
+
+    public static String[] instances() {
+        return new String[] {IGNORE.name, FAIL.name};
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 3a220f1..e091264 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -32,10 +32,12 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.DorisSinkTask;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.connection.ConnectionProvider;
 import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
 import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
 import org.apache.doris.kafka.connector.metrics.MetricsJmxReporter;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
 import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
 import org.apache.doris.kafka.connector.writer.DorisWriter;
 import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
@@ -44,6 +46,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.slf4j.Logger;
@@ -126,13 +129,8 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
     public void insert(final Collection<SinkRecord> records) {
         // note that records can be empty
         for (SinkRecord record : records) {
-            // skip null value records
-            if (record.value() == null) {
-                LOG.debug(
-                        "Null valued record from topic '{}', partition {} and 
offset {} was skipped",
-                        record.topic(),
-                        record.kafkaPartition(),
-                        record.kafkaOffset());
+            // skip records
+            if (shouldSkipRecord(record)) {
                 continue;
             }
             // check topic mutating SMTs
@@ -212,6 +210,33 @@ public class DorisDefaultSinkService implements 
DorisSinkService {
         }
     }
 
+    @VisibleForTesting
+    public boolean shouldSkipRecord(SinkRecord record) {
+        if (record.value() == null) {
+            switch (dorisOptions.getBehaviorOnNullValues()) {
+                case FAIL:
+                    throw new DataException(
+                            String.format(
+                                    "Null valued record from topic %s, 
partition %s and offset %s was failed "
+                                            + "(the configuration property 
'%s' is '%s').",
+                                    record.topic(),
+                                    record.kafkaPartition(),
+                                    record.kafkaOffset(),
+                                    
DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES,
+                                    BehaviorOnNullValues.FAIL));
+                case IGNORE:
+                default:
+                    LOG.debug(
+                            "Null valued record from topic '{}', partition {} 
and offset {} was skipped",
+                            record.topic(),
+                            record.kafkaPartition(),
+                            record.kafkaOffset());
+                    return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Get the table name in doris for the given record.
      *
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index d7a299f..465b7dc 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -32,6 +32,7 @@ import 
org.apache.doris.kafka.connector.converter.ConverterMode;
 import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
 import org.apache.doris.kafka.connector.exception.ArgumentsException;
 import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
 import org.apache.doris.kafka.connector.writer.LoadConstants;
 import org.apache.doris.kafka.connector.writer.load.GroupCommitMode;
@@ -200,6 +201,15 @@ public class ConfigCheckUtils {
             configIsValid = false;
         }
 
+        String behaviorOnNullValues = 
config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES);
+        if (!validateEnumInstances(behaviorOnNullValues, 
BehaviorOnNullValues.instances())) {
+            LOG.error(
+                    "The value of {} is an illegal parameter of {}.",
+                    behaviorOnNullValues,
+                    DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES);
+            configIsValid = false;
+        }
+
         if (!configIsValid) {
             throw new DorisException(
                     "input kafka connector configuration is null, missing 
required values, or wrong input value");
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index 5caa668..e6633ec 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -274,6 +274,13 @@ public class TestDorisSinkConnectorConfig {
         ConfigCheckUtils.validateConfig(config);
     }
 
+    @Test(expected = DorisException.class)
+    public void testBehaviorOnNullValuesException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES, "create");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
     @Test
     public void testSchemaEvolutionMode() {
         Map<String, String> config = getConfig();
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 4430dad..9ec116f 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -331,6 +331,61 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expectedResult, query1, 3);
     }
 
+    @Test
+    public void testBehaviorOnNullValues() throws Exception {
+        // default
+        
initialize("src/test/resources/e2e/string_converter/null_values_default.json");
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/null_values_tab.sql");
+        createTable(tableSql);
+        Thread.sleep(2000);
+
+        String topic = "behavior_on_null_values_test";
+        String msg1 = "{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\"}";
+        produceMsg2Kafka(topic, null);
+        produceMsg2Kafka(topic, msg1);
+
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        List<String> expected = Collections.singletonList("1,col1,col2");
+        Thread.sleep(10000);
+        String query1 =
+                String.format(
+                        "select id,col1,col2 from %s.%s order by id", 
database, "null_values_tab");
+        checkResult(expected, query1, 3);
+
+        kafkaContainerService.deleteKafkaConnector(connectorName);
+
+        // ignore
+        
initialize("src/test/resources/e2e/string_converter/null_values_ignore.json");
+        produceMsg2Kafka(topic, null);
+        String msg2 = "{\"id\":2,\"col1\":\"col1\",\"col2\":\"col2\"}";
+        produceMsg2Kafka(topic, msg2);
+
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        Thread.sleep(10000);
+        expected = Arrays.asList("1,col1,col2", "2,col1,col2");
+        checkResult(expected, query1, 3);
+
+        kafkaContainerService.deleteKafkaConnector(connectorName);
+
+        // fail
+        
initialize("src/test/resources/e2e/string_converter/null_values_fail.json");
+        produceMsg2Kafka(topic, null);
+
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        Thread.sleep(10000);
+        Assert.assertEquals("FAILED", 
kafkaContainerService.getConnectorTaskStatus(connectorName));
+
+        String msg3 = "{\"id\":3,\"col1\":\"col1\",\"col2\":\"col2\"}";
+        produceMsg2Kafka(topic, msg3);
+        Thread.sleep(10000);
+        // msg3 is not consumed
+        checkResult(expected, query1, 3);
+    }
+
     @AfterClass
     public static void closeInstance() {
         kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
index dc2e6ae..a99e144 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTaskContext;
@@ -47,6 +48,7 @@ public class TestDorisSinkService {
 
     private DorisDefaultSinkService dorisDefaultSinkService;
     private JsonConverter jsonConverter = new JsonConverter();
+    private Properties props = new Properties();
 
     @Before
     public void init() throws IOException {
@@ -54,7 +56,6 @@ public class TestDorisSinkService {
                 this.getClass()
                         .getClassLoader()
                         
.getResourceAsStream("doris-connector-sink.properties");
-        Properties props = new Properties();
         props.load(stream);
         DorisSinkConnectorConfig.setDefaultValues((Map) props);
         props.put("task_id", "1");
@@ -170,4 +171,68 @@ public class TestDorisSinkService {
                 ConnectException.class,
                 () -> dorisDefaultSinkService.checkTopicMutating(record2));
     }
+
+    @Test
+    public void shouldSkipRecordTest() {
+        // default
+        SinkRecord record1 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "{\"id\":1,\"name\":\"bob\",\"age\":12}",
+                        1);
+        Assert.assertFalse(dorisDefaultSinkService.shouldSkipRecord(record1));
+        SinkRecord record2 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA)
+                                .optional()
+                                .build(),
+                        null,
+                        1);
+        Assert.assertTrue(dorisDefaultSinkService.shouldSkipRecord(record2));
+
+        // ignore value
+        props.put("behavior.on.null.values", "ignore");
+        dorisDefaultSinkService =
+                new DorisDefaultSinkService((Map) props, 
mock(SinkTaskContext.class));
+        SinkRecord record3 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA)
+                                .optional()
+                                .build(),
+                        null,
+                        1);
+        Assert.assertTrue(dorisDefaultSinkService.shouldSkipRecord(record3));
+
+        // fail value
+        props.put("behavior.on.null.values", "fail");
+        dorisDefaultSinkService =
+                new DorisDefaultSinkService((Map) props, 
mock(SinkTaskContext.class));
+        SinkRecord record4 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.STRING_SCHEMA)
+                                .optional()
+                                .build(),
+                        null,
+                        1);
+        Assert.assertThrows(
+                "Null valued record from topic topic_test, partition 0 and 
offset 1 was failed (the configuration property 'behavior.on.null.values' is 
'FAIL').",
+                DataException.class,
+                () -> dorisDefaultSinkService.shouldSkipRecord(record4));
+    }
 }
diff --git a/src/test/resources/e2e/string_converter/null_values_default.json 
b/src/test/resources/e2e/string_converter/null_values_default.json
new file mode 100644
index 0000000..9ca212f
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_default.json
@@ -0,0 +1,23 @@
+{
+  "name":"null_values_default_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"behavior_on_null_values_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg",
+    "enable.2pc": "false",
+    "load.model":"stream_load",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/null_values_fail.json 
b/src/test/resources/e2e/string_converter/null_values_fail.json
new file mode 100644
index 0000000..b12628f
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_fail.json
@@ -0,0 +1,24 @@
+{
+  "name":"null_values_fail_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"behavior_on_null_values_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg",
+    "enable.2pc": "false",
+    "load.model":"stream_load",
+    "behavior.on.null.values":"fail",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/null_values_ignore.json 
b/src/test/resources/e2e/string_converter/null_values_ignore.json
new file mode 100644
index 0000000..05b8e89
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_ignore.json
@@ -0,0 +1,24 @@
+{
+  "name":"null_values_ignore_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"behavior_on_null_values_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg",
+    "enable.2pc": "false",
+    "load.model":"stream_load",
+    "behavior.on.null.values":"ignore",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter.schemas.enable": "false"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/null_values_tab.sql 
b/src/test/resources/e2e/string_converter/null_values_tab.sql
new file mode 100644
index 0000000..22749d5
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_tab.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg.null_values_tab (
+  id INT NULL,
+  col1 VARCHAR(20) NULL,
+  col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to