This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d81331  [Improve] add retry strategy (#67)
8d81331 is described below

commit 8d81331fc5bfe752c41f9ff0dbb64bc36c5a91f5
Author: wudi <676366...@qq.com>
AuthorDate: Fri Apr 11 16:11:58 2025 +0800

    [Improve] add retry strategy (#67)
---
 .../doris/kafka/connector/DorisSinkTask.java       |  22 ++-
 .../doris/kafka/connector/cfg/DorisOptions.java    |  21 +++
 .../connector/cfg/DorisSinkConnectorConfig.java    |  23 +++-
 .../doris/kafka/connector/model/RespContent.java   |   4 +
 .../kafka/connector/utils/ConfigCheckUtils.java    |  16 +++
 .../doris/kafka/connector/writer/DorisWriter.java  |   3 +-
 .../connector/writer/load/DorisStreamLoad.java     |   3 +
 .../cfg/TestDorisSinkConnectorConfig.java          |  14 ++
 .../e2e/doris/DorisContainerServiceImpl.java       |  14 +-
 .../e2e/sink/AbstractKafka2DorisSink.java          |  55 ++++++++
 .../stringconverter/DorisSinkFailoverSinkTest.java | 149 +++++++++++++++++++++
 src/test/resources/docker/doris/be.conf            |  99 ++++++++++++++
 src/test/resources/docker/doris/fe.conf            |  74 ++++++++++
 .../string_msg_failover_connector.json             |  25 ++++
 .../string_converter/string_msg_tab_failover.sql   |  12 ++
 15 files changed, 528 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index f793d26..e83f033 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -22,6 +22,7 @@ package org.apache.doris.kafka.connector;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.service.DorisSinkService;
 import org.apache.doris.kafka.connector.service.DorisSinkServiceFactory;
 import org.apache.doris.kafka.connector.utils.Version;
@@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory;
 public class DorisSinkTask extends SinkTask {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSinkTask.class);
     private DorisSinkService sink = null;
+    private DorisOptions options;
+    private int remainingRetries;
 
     /** default constructor, invoked by kafka connect framework */
     public DorisSinkTask() {}
@@ -49,7 +52,9 @@ public class DorisSinkTask extends SinkTask {
      */
     @Override
     public void start(final Map<String, String> parsedConfig) {
-        LOG.info("kafka doris sink task start");
+        LOG.info("kafka doris sink task start with {}", parsedConfig);
+        this.options = new DorisOptions(parsedConfig);
+        this.remainingRetries = options.getMaxRetries();
         this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
     }
 
@@ -94,7 +99,20 @@ public class DorisSinkTask extends SinkTask {
     @Override
     public void put(final Collection<SinkRecord> records) {
         LOG.info("Read {} records from Kafka", records.size());
-        sink.insert(records);
+        try {
+            sink.insert(records);
+        } catch (Exception ex) {
+            LOG.error("Error inserting records to Doris", ex);
+            if (remainingRetries > 0) {
+                LOG.info(
+                        "Retrying to insert records to Doris, remaining 
retries: {}",
+                        remainingRetries);
+                remainingRetries--;
+                context.timeout(options.getRetryIntervalMs());
+                throw new RetriableException(ex);
+            }
+            throw ex;
+        }
     }
 
     /**
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 9afd9cb..c1a2cbd 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -66,6 +66,8 @@ public class DorisOptions {
     private final DeliveryGuarantee deliveryGuarantee;
     private final ConverterMode converterMode;
     private final SchemaEvolutionMode schemaEvolutionMode;
+    private final int maxRetries;
+    private final int retryIntervalMs;
 
     public DorisOptions(Map<String, String> config) {
         this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -127,6 +129,17 @@ public class DorisOptions {
         }
         this.streamLoadProp = getStreamLoadPropFromConfig(config);
         this.enableGroupCommit = 
ConfigCheckUtils.validateGroupCommitMode(this);
+        this.maxRetries =
+                Integer.parseInt(
+                        config.getOrDefault(
+                                DorisSinkConnectorConfig.MAX_RETRIES,
+                                
String.valueOf(DorisSinkConnectorConfig.MAX_RETRIES_DEFAULT)));
+        this.retryIntervalMs =
+                Integer.parseInt(
+                        config.getOrDefault(
+                                DorisSinkConnectorConfig.RETRY_INTERVAL_MS,
+                                String.valueOf(
+                                        
DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT)));
     }
 
     private Properties getStreamLoadPropFromConfig(Map<String, String> config) 
{
@@ -320,4 +333,12 @@ public class DorisOptions {
     public boolean isEnableDelete() {
         return enableDelete;
     }
+
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public int getRetryIntervalMs() {
+        return retryIntervalMs;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 0dd7315..53442ca 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -89,6 +89,12 @@ public class DorisSinkConnectorConfig {
     public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT =
             SchemaEvolutionMode.NONE.getName();
 
+    public static final String MAX_RETRIES = "max.retries";
+    public static final int MAX_RETRIES_DEFAULT = 10;
+
+    public static final String RETRY_INTERVAL_MS = "retry.interval.ms";
+    public static final int RETRY_INTERVAL_MS_DEFAULT = 6000;
+
     // metrics
     public static final String JMX_OPT = "jmx";
     public static final boolean JMX_OPT_DEFAULT = true;
@@ -116,6 +122,9 @@ public class DorisSinkConnectorConfig {
         setFieldToDefaultValues(
                 config, DEBEZIUM_SCHEMA_EVOLUTION, 
DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT);
         setFieldToDefaultValues(config, JMX_OPT, 
String.valueOf(JMX_OPT_DEFAULT));
+        setFieldToDefaultValues(config, MAX_RETRIES, 
String.valueOf(MAX_RETRIES_DEFAULT));
+        setFieldToDefaultValues(
+                config, RETRY_INTERVAL_MS, 
String.valueOf(RETRY_INTERVAL_MS_DEFAULT));
     }
 
     public static Map<String, String> convertToLowercase(Map<String, String> 
config) {
@@ -270,7 +279,19 @@ public class DorisSinkConnectorConfig {
                         Type.STRING,
                         LOAD_MODEL_DEFAULT,
                         Importance.HIGH,
-                        "load model is stream_load.");
+                        "load model is stream_load.")
+                .define(
+                        MAX_RETRIES,
+                        Type.INT,
+                        MAX_RETRIES_DEFAULT,
+                        Importance.MEDIUM,
+                        "The maximum number of times to retry on errors before 
failing the task.")
+                .define(
+                        RETRY_INTERVAL_MS,
+                        Type.INT,
+                        RETRY_INTERVAL_MS_DEFAULT,
+                        Importance.MEDIUM,
+                        "The time in milliseconds to wait following an error 
before a retry attempt is made.");
     }
 
     public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java 
b/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
index 5d29337..b66c416 100644
--- a/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
+++ b/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
@@ -97,6 +97,10 @@ public class RespContent {
         return message;
     }
 
+    public String getLabel() {
+        return label;
+    }
+
     public String getExistingJobStatus() {
         return existingJobStatus;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index c6611bc..d7a299f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -184,6 +184,22 @@ public class ConfigCheckUtils {
             configIsValid = false;
         }
 
+        String maxRetries = config.get(DorisSinkConnectorConfig.MAX_RETRIES);
+        if (!isNumeric(maxRetries) || isIllegalRange(maxRetries, 0)) {
+            LOG.error(
+                    "{} cannot be empty or not a number or less than 0.",
+                    DorisSinkConnectorConfig.MAX_RETRIES);
+            configIsValid = false;
+        }
+
+        String retryIntervalMs = 
config.get(DorisSinkConnectorConfig.RETRY_INTERVAL_MS);
+        if (!isNumeric(retryIntervalMs) || isIllegalRange(retryIntervalMs, 0)) 
{
+            LOG.error(
+                    "{} cannot be empty or not a number or less than 0.",
+                    DorisSinkConnectorConfig.RETRY_INTERVAL_MS);
+            configIsValid = false;
+        }
+
         if (!configIsValid) {
             throw new DorisException(
                     "input kafka connector configuration is null, missing 
required values, or wrong input value");
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 9481a2f..348f784 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -120,7 +120,7 @@ public abstract class DorisWriter {
                 && record.kafkaOffset() > processedOffset.get()) {
             SinkRecord dorisRecord = record;
             RecordBuffer tmpBuff = null;
-            processedOffset.set(dorisRecord.kafkaOffset());
+
             putBuffer(dorisRecord);
             if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
                     || (dorisOptions.getRecordNum() != 0
@@ -132,6 +132,7 @@ public abstract class DorisWriter {
             if (tmpBuff != null) {
                 flush(tmpBuff);
             }
+            processedOffset.set(dorisRecord.kafkaOffset());
         } else {
             LOG.warn(
                     "The record offset is smaller than processedOffset. 
recordOffset={}, offsetPersistedInDoris={}, processedOffset={}",
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index 9ab4b8d..f3a3a62 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -102,6 +102,9 @@ public class DorisStreamLoad extends DataLoad {
                 LOG.info("load Result {}", loadResult);
                 KafkaRespContent respContent =
                         OBJECT_MAPPER.readValue(loadResult, 
KafkaRespContent.class);
+                if (respContent == null || respContent.getMessage() == null) {
+                    throw new StreamLoadException("response error : " + 
loadResult);
+                }
                 if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                     String errMsg =
                             String.format(
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index 9e46cc1..5caa668 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -260,6 +260,20 @@ public class TestDorisSinkConnectorConfig {
         ConfigCheckUtils.validateConfig(config);
     }
 
+    @Test(expected = DorisException.class)
+    public void testMaxRetryException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.MAX_RETRIES, "abc");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
+    @Test(expected = DorisException.class)
+    public void testRetryIntervalMsException() {
+        Map<String, String> config = getConfig();
+        config.put(DorisSinkConnectorConfig.RETRY_INTERVAL_MS, "abc");
+        ConfigCheckUtils.validateConfig(config);
+    }
+
     @Test
     public void testSchemaEvolutionMode() {
         Map<String, String> config = getConfig();
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
index c328044..e626538 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
@@ -44,6 +44,7 @@ import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
 
 public class DorisContainerServiceImpl implements DorisContainerService {
     protected static final Logger LOG = 
LoggerFactory.getLogger(DorisContainerServiceImpl.class);
@@ -69,14 +70,23 @@ public class DorisContainerServiceImpl implements 
DorisContainerService {
                         .withLogConsumer(
                                 new Slf4jLogConsumer(
                                         
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
-                        .withExposedPorts(8030, 9030, 8040, 9060);
+                        // use customer conf
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("docker/doris/be.conf"),
+                                "/opt/apache-doris/be/conf/be.conf")
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("docker/doris/fe.conf"),
+                                "/opt/apache-doris/fe/conf/fe.conf")
+                        .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610);
 
         container.setPortBindings(
                 Lists.newArrayList(
                         String.format("%s:%s", "8030", "8030"),
                         String.format("%s:%s", "9030", "9030"),
                         String.format("%s:%s", "9060", "9060"),
-                        String.format("%s:%s", "8040", "8040")));
+                        String.format("%s:%s", "8040", "8040"),
+                        String.format("%s:%s", "9611", "9611"),
+                        String.format("%s:%s", "9610", "9610")));
         return container;
     }
 
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index 78fd626..ad0b84a 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -36,12 +37,19 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.e2e.doris.DorisContainerService;
 import org.apache.doris.kafka.connector.e2e.doris.DorisContainerServiceImpl;
 import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService;
 import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl;
 import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -195,4 +203,51 @@ public abstract class AbstractKafka2DorisSink {
         LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
         Assert.assertArrayEquals(expected.toArray(), actual.toArray());
     }
+
+    protected void faultInjectionOpen() throws IOException {
+        String pointName = "FlushToken.submit_flush_error";
+        String apiUrl =
+                String.format(
+                        "http://%s:%s/api/debug_point/add/%s";,
+                        dorisContainerService.getInstanceHost(), 8040, 
pointName);
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(HttpHeaders.AUTHORIZATION, auth(USERNAME, 
PASSWORD));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    protected void faultInjectionClear() throws IOException {
+        String apiUrl =
+                String.format(
+                        "http://%s:%s/api/debug_point/clear";,
+                        dorisContainerService.getInstanceHost(), 8040);
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(HttpHeaders.AUTHORIZATION, auth(USERNAME, 
PASSWORD));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    protected String auth(String user, String password) {
+        final String authInfo = user + ":" + password;
+        byte[] encoded = 
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded);
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
new file mode 100644
index 0000000..c81055f
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.stringconverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** DorisSinkFailoverSinkTest is a test class for Doris Sink Connector. */
+public class DorisSinkFailoverSinkTest extends AbstractStringE2ESinkTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisSinkFailoverSinkTest.class);
+    private static String connectorName;
+    private static String jsonMsgConnectorContent;
+    private static DorisOptions dorisOptions;
+    private static String database;
+
+    @BeforeClass
+    public static void setUp() {
+        initServer();
+        initProducer();
+    }
+
+    public static void initialize(String connectorPath) {
+        jsonMsgConnectorContent = loadContent(connectorPath);
+        JsonNode rootNode = null;
+        try {
+            rootNode = objectMapper.readTree(jsonMsgConnectorContent);
+        } catch (IOException e) {
+            throw new DorisException("Failed to read content body.", e);
+        }
+        connectorName = rootNode.get(NAME).asText();
+        JsonNode configNode = rootNode.get(CONFIG);
+        Map<String, String> configMap = objectMapper.convertValue(configNode, 
Map.class);
+        configMap.put(ConfigCheckUtils.TASK_ID, "1");
+        Map<String, String> lowerCaseConfigMap =
+                DorisSinkConnectorConfig.convertToLowercase(configMap);
+        DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
+        dorisOptions = new DorisOptions(lowerCaseConfigMap);
+        database = dorisOptions.getDatabase();
+        createDatabase(database);
+        setTimeZone();
+    }
+
+    private static void setTimeZone() {
+        executeSql(getJdbcConnection(), "set global time_zone = 
'Asia/Shanghai'");
+    }
+
+    /** mock streamload failure */
+    @Test
+    public void testStreamLoadFailoverSink() throws Exception {
+        LOG.info("start to test testStreamLoadFailoverSink.");
+        
initialize("src/test/resources/e2e/string_converter/string_msg_failover_connector.json");
+        Thread.sleep(5000);
+        String topic = "string_test_failover";
+        String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+        produceMsg2Kafka(topic, msg1);
+
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/string_msg_tab_failover.sql");
+        createTable(tableSql);
+
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        String querySql =
+                String.format("select id,name,age from %s.%s order by id", 
database, table);
+        LOG.info("start to query result from doris. sql={}", querySql);
+        while (true) {
+            List<String> result = executeSQLStatement(getJdbcConnection(), 
LOG, querySql, 3);
+            // until load success one time
+            if (result.size() >= 1) {
+                faultInjectionOpen();
+                // mock new data
+                String msg2 = "{\"id\":2,\"name\":\"lisi\",\"age\":18}";
+                produceMsg2Kafka(topic, msg2);
+                Thread.sleep(15000);
+                faultInjectionClear();
+                break;
+            } else {
+                Thread.sleep(100);
+            }
+        }
+
+        String msg3 = "{\"id\":3,\"name\":\"wangwu\",\"age\":38}";
+        produceMsg2Kafka(topic, msg3);
+        Thread.sleep(25000);
+
+        List<String> excepted = Arrays.asList("1,zhangsan,12", "2,lisi,18", 
"3,wangwu,38");
+        checkResult(excepted, querySql, 3);
+    }
+
+    public static List<String> executeSQLStatement(
+            Connection connection, Logger logger, String sql, int columnSize) {
+        List<String> result = new ArrayList<>();
+        if (Objects.isNull(sql)) {
+            return result;
+        }
+        try (Statement statement = connection.createStatement()) {
+            logger.info("start to execute sql={}", sql);
+            ResultSet resultSet = statement.executeQuery(sql);
+
+            while (resultSet.next()) {
+                StringJoiner sb = new StringJoiner(",");
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = resultSet.getObject(i);
+                    sb.add(String.valueOf(value));
+                }
+                result.add(sb.toString());
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/src/test/resources/docker/doris/be.conf 
b/src/test/resources/docker/doris/be.conf
new file mode 100644
index 0000000..94b76e0
--- /dev/null
+++ b/src/test/resources/docker/doris/be.conf
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+PPROF_TMPDIR="$DORIS_HOME/log/"
+
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 9+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 17+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives 
--add-opens=java.base/java.net=ALL-UNNAMED"
+
+# since 1.2, the JAVA_HOME need to be set to run BE process.
+# JAVA_HOME=/path/to/jdk/
+
+# 
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
+# https://jemalloc.net/jemalloc.3.html
+JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false"
+JEMALLOC_PROF_PRFIX=""
+
+# INFO, WARNING, ERROR, FATAL
+sys_log_level = INFO
+
+# ports for admin, web, heartbeat service
+be_port = 9060
+webserver_port = 8040
+heartbeat_service_port = 9050
+brpc_port = 8060
+arrow_flight_sql_port = 9610
+enable_debug_points = true
+
+# HTTPS configures
+enable_https = false
+# path of certificate in PEM format.
+ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
+# path of private key in PEM format.
+ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
+
+
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# data root path, separate by ';'
+# You can specify the storage type for each root path, HDD (cold data) or SSD 
(hot data)
+# eg:
+# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
+# storage_root_path = 
/home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
+# /home/disk2/doris,medium:HDD(default)
+#
+# you also can specify the properties by setting '<property>:<value>', 
separate by ','
+# property 'medium' has a higher priority than the extension of path
+#
+# Default value is ${DORIS_HOME}/storage, you should create it by hand.
+# storage_root_path = ${DORIS_HOME}/storage
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+# Advanced configurations
+# sys_log_dir = ${DORIS_HOME}/log
+# sys_log_roll_mode = SIZE-MB-1024
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = *
+# log_buffer_level = -1
+# palo_cgroups
+
+# aws sdk log level
+#    Off = 0,
+#    Fatal = 1,
+#    Error = 2,
+#    Warn = 3,
+#    Info = 4,
+#    Debug = 5,
+#    Trace = 6
+# Default to turn off aws sdk log, because aws sdk errors that need to be 
cared will be output through Doris logs
+aws_log_level=0
+## If you are not running in aws cloud, you can disable EC2 metadata
+AWS_EC2_METADATA_DISABLED=true
\ No newline at end of file
diff --git a/src/test/resources/docker/doris/fe.conf 
b/src/test/resources/docker/doris/fe.conf
new file mode 100644
index 0000000..a45fb53
--- /dev/null
+++ b/src/test/resources/docker/doris/fe.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#####################################################################
+## The uppercase properties are read and exported by bin/start_fe.sh.
+## To see all Frontend configurations,
+## see fe/src/org/apache/doris/common/Config.java
+#####################################################################
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+# Log dir
+LOG_DIR = ${DORIS_HOME}/log
+
+# For jdk 17, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 
-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR 
-Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M
 --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens 
java.base/jdk.internal.ref=ALL-UNNAMED"
+
+# Set your own JAVA_HOME
+# JAVA_HOME=/path/to/jdk/
+
+##
+## the lowercase properties are read by main program.
+##
+
+# store metadata, must be created before start FE.
+# Default value is ${DORIS_HOME}/doris-meta
+# meta_dir = ${DORIS_HOME}/doris-meta
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+http_port = 8030
+rpc_port = 9020
+query_port = 9030
+edit_log_port = 9010
+arrow_flight_sql_port = 9611
+enable_debug_points = true
+arrow_flight_token_cache_size = 50
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# Advanced configurations
+# log_roll_size_mb = 1024
+# INFO, WARN, ERROR, FATAL
+sys_log_level = INFO
+# NORMAL, BRIEF, ASYNC
+sys_log_mode = ASYNC
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = org.apache.doris
+# audit_log_dir = $LOG_DIR
+# audit_log_modules = slow_query, query
+# audit_log_roll_num = 10
+# meta_delay_toleration_second = 10
+# qe_max_connection = 1024
+# qe_query_timeout_second = 300
+# qe_slow_log_ms = 5000
\ No newline at end of file
diff --git 
a/src/test/resources/e2e/string_converter/string_msg_failover_connector.json 
b/src/test/resources/e2e/string_converter/string_msg_failover_connector.json
new file mode 100644
index 0000000..d8604b5
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_failover_connector.json
@@ -0,0 +1,25 @@
+{
+  "name":"string_msg_failover_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"string_test_failover",
+    "tasks.max":"1",
+    "doris.topic2table.map": "string_test_failover:string_msg_tab_failover",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"1200",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg_failover",
+    "load.model":"stream_load",
+    "delivery.guarantee":"exactly_once",
+    "enable.2pc": "true",
+    "max.retries": "10",
+    "retry.interval.ms": "5000",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git 
a/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql 
b/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql
new file mode 100644
index 0000000..155f217
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg_failover.string_msg_tab_failover (
+  id INT NULL,
+  name VARCHAR(100) NULL,
+  age INT NULL
+) ENGINE=OLAP
+DUPLICATE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to