This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8d81331 [Improve] add retry strategy (#67) 8d81331 is described below commit 8d81331fc5bfe752c41f9ff0dbb64bc36c5a91f5 Author: wudi <676366...@qq.com> AuthorDate: Fri Apr 11 16:11:58 2025 +0800 [Improve] add retry strategy (#67) --- .../doris/kafka/connector/DorisSinkTask.java | 22 ++- .../doris/kafka/connector/cfg/DorisOptions.java | 21 +++ .../connector/cfg/DorisSinkConnectorConfig.java | 23 +++- .../doris/kafka/connector/model/RespContent.java | 4 + .../kafka/connector/utils/ConfigCheckUtils.java | 16 +++ .../doris/kafka/connector/writer/DorisWriter.java | 3 +- .../connector/writer/load/DorisStreamLoad.java | 3 + .../cfg/TestDorisSinkConnectorConfig.java | 14 ++ .../e2e/doris/DorisContainerServiceImpl.java | 14 +- .../e2e/sink/AbstractKafka2DorisSink.java | 55 ++++++++ .../stringconverter/DorisSinkFailoverSinkTest.java | 149 +++++++++++++++++++++ src/test/resources/docker/doris/be.conf | 99 ++++++++++++++ src/test/resources/docker/doris/fe.conf | 74 ++++++++++ .../string_msg_failover_connector.json | 25 ++++ .../string_converter/string_msg_tab_failover.sql | 12 ++ 15 files changed, 528 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java index f793d26..e83f033 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java @@ -22,6 +22,7 @@ package org.apache.doris.kafka.connector; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.service.DorisSinkService; import org.apache.doris.kafka.connector.service.DorisSinkServiceFactory; import org.apache.doris.kafka.connector.utils.Version; @@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory; public class DorisSinkTask extends SinkTask { private static final Logger LOG = LoggerFactory.getLogger(DorisSinkTask.class); private DorisSinkService sink = null; + private DorisOptions options; + private int remainingRetries; /** default constructor, invoked by kafka connect framework */ public DorisSinkTask() {} @@ -49,7 +52,9 @@ public class DorisSinkTask extends SinkTask { */ @Override public void start(final Map<String, String> parsedConfig) { - LOG.info("kafka doris sink task start"); + LOG.info("kafka doris sink task start with {}", parsedConfig); + this.options = new DorisOptions(parsedConfig); + this.remainingRetries = options.getMaxRetries(); this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig); } @@ -94,7 +99,20 @@ public class DorisSinkTask extends SinkTask { @Override public void put(final Collection<SinkRecord> records) { LOG.info("Read {} records from Kafka", records.size()); - sink.insert(records); + try { + sink.insert(records); + } catch (Exception ex) { + LOG.error("Error inserting records to Doris", ex); + if (remainingRetries > 0) { + LOG.info( + "Retrying to insert records to Doris, remaining retries: {}", + remainingRetries); + remainingRetries--; + context.timeout(options.getRetryIntervalMs()); + throw new RetriableException(ex); + } + throw ex; + } } /** diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 9afd9cb..c1a2cbd 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -66,6 +66,8 @@ public class DorisOptions { private final DeliveryGuarantee deliveryGuarantee; private final ConverterMode converterMode; private final SchemaEvolutionMode schemaEvolutionMode; + private final int maxRetries; + private final int retryIntervalMs; public DorisOptions(Map<String, String> config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -127,6 +129,17 @@ public class DorisOptions { } this.streamLoadProp = getStreamLoadPropFromConfig(config); this.enableGroupCommit = ConfigCheckUtils.validateGroupCommitMode(this); + this.maxRetries = + Integer.parseInt( + config.getOrDefault( + DorisSinkConnectorConfig.MAX_RETRIES, + String.valueOf(DorisSinkConnectorConfig.MAX_RETRIES_DEFAULT))); + this.retryIntervalMs = + Integer.parseInt( + config.getOrDefault( + DorisSinkConnectorConfig.RETRY_INTERVAL_MS, + String.valueOf( + DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT))); } private Properties getStreamLoadPropFromConfig(Map<String, String> config) { @@ -320,4 +333,12 @@ public class DorisOptions { public boolean isEnableDelete() { return enableDelete; } + + public int getMaxRetries() { + return maxRetries; + } + + public int getRetryIntervalMs() { + return retryIntervalMs; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 0dd7315..53442ca 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -89,6 +89,12 @@ public class DorisSinkConnectorConfig { public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT = SchemaEvolutionMode.NONE.getName(); + public static final String MAX_RETRIES = "max.retries"; + public static final int MAX_RETRIES_DEFAULT = 10; + + public static final String RETRY_INTERVAL_MS = "retry.interval.ms"; + public static final int RETRY_INTERVAL_MS_DEFAULT = 6000; + // metrics public static final String JMX_OPT = "jmx"; public static final boolean JMX_OPT_DEFAULT = true; @@ -116,6 +122,9 @@ public class DorisSinkConnectorConfig { setFieldToDefaultValues( config, DEBEZIUM_SCHEMA_EVOLUTION, DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT); setFieldToDefaultValues(config, JMX_OPT, String.valueOf(JMX_OPT_DEFAULT)); + setFieldToDefaultValues(config, MAX_RETRIES, String.valueOf(MAX_RETRIES_DEFAULT)); + setFieldToDefaultValues( + config, RETRY_INTERVAL_MS, String.valueOf(RETRY_INTERVAL_MS_DEFAULT)); } public static Map<String, String> convertToLowercase(Map<String, String> config) { @@ -270,7 +279,19 @@ public class DorisSinkConnectorConfig { Type.STRING, LOAD_MODEL_DEFAULT, Importance.HIGH, - "load model is stream_load."); + "load model is stream_load.") + .define( + MAX_RETRIES, + Type.INT, + MAX_RETRIES_DEFAULT, + Importance.MEDIUM, + "The maximum number of times to retry on errors before failing the task.") + .define( + RETRY_INTERVAL_MS, + Type.INT, + RETRY_INTERVAL_MS_DEFAULT, + Importance.MEDIUM, + "The time in milliseconds to wait following an error before a retry attempt is made."); } public static class TopicToTableValidator implements ConfigDef.Validator { diff --git a/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java b/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java index 5d29337..b66c416 100644 --- a/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java +++ b/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java @@ -97,6 +97,10 @@ public class RespContent { return message; } + public String getLabel() { + return label; + } + public String getExistingJobStatus() { return existingJobStatus; } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index c6611bc..d7a299f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -184,6 +184,22 @@ public class ConfigCheckUtils { configIsValid = false; } + String maxRetries = config.get(DorisSinkConnectorConfig.MAX_RETRIES); + if (!isNumeric(maxRetries) || isIllegalRange(maxRetries, 0)) { + LOG.error( + "{} cannot be empty or not a number or less than 0.", + DorisSinkConnectorConfig.MAX_RETRIES); + configIsValid = false; + } + + String retryIntervalMs = config.get(DorisSinkConnectorConfig.RETRY_INTERVAL_MS); + if (!isNumeric(retryIntervalMs) || isIllegalRange(retryIntervalMs, 0)) { + LOG.error( + "{} cannot be empty or not a number or less than 0.", + DorisSinkConnectorConfig.RETRY_INTERVAL_MS); + configIsValid = false; + } + if (!configIsValid) { throw new DorisException( "input kafka connector configuration is null, missing required values, or wrong input value"); diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java index 9481a2f..348f784 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java @@ -120,7 +120,7 @@ public abstract class DorisWriter { && record.kafkaOffset() > processedOffset.get()) { SinkRecord dorisRecord = record; RecordBuffer tmpBuff = null; - processedOffset.set(dorisRecord.kafkaOffset()); + putBuffer(dorisRecord); if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize() || (dorisOptions.getRecordNum() != 0 @@ -132,6 +132,7 @@ public abstract class DorisWriter { if (tmpBuff != null) { flush(tmpBuff); } + processedOffset.set(dorisRecord.kafkaOffset()); } else { LOG.warn( "The record offset is smaller than processedOffset. recordOffset={}, offsetPersistedInDoris={}, processedOffset={}", diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java index 9ab4b8d..f3a3a62 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java @@ -102,6 +102,9 @@ public class DorisStreamLoad extends DataLoad { LOG.info("load Result {}", loadResult); KafkaRespContent respContent = OBJECT_MAPPER.readValue(loadResult, KafkaRespContent.class); + if (respContent == null || respContent.getMessage() == null) { + throw new StreamLoadException("response error : " + loadResult); + } if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { String errMsg = String.format( diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java index 9e46cc1..5caa668 100644 --- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java +++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java @@ -260,6 +260,20 @@ public class TestDorisSinkConnectorConfig { ConfigCheckUtils.validateConfig(config); } + @Test(expected = DorisException.class) + public void testMaxRetryException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.MAX_RETRIES, "abc"); + ConfigCheckUtils.validateConfig(config); + } + + @Test(expected = DorisException.class) + public void testRetryIntervalMsException() { + Map<String, String> config = getConfig(); + config.put(DorisSinkConnectorConfig.RETRY_INTERVAL_MS, "abc"); + ConfigCheckUtils.validateConfig(config); + } + @Test public void testSchemaEvolutionMode() { Map<String, String> config = getConfig(); diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java index c328044..e626538 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java @@ -44,6 +44,7 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; public class DorisContainerServiceImpl implements DorisContainerService { protected static final Logger LOG = LoggerFactory.getLogger(DorisContainerServiceImpl.class); @@ -69,14 +70,23 @@ public class DorisContainerServiceImpl implements DorisContainerService { .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) - .withExposedPorts(8030, 9030, 8040, 9060); + // use customer conf + .withCopyFileToContainer( + MountableFile.forClasspathResource("docker/doris/be.conf"), + "/opt/apache-doris/be/conf/be.conf") + .withCopyFileToContainer( + MountableFile.forClasspathResource("docker/doris/fe.conf"), + "/opt/apache-doris/fe/conf/fe.conf") + .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610); container.setPortBindings( Lists.newArrayList( String.format("%s:%s", "8030", "8030"), String.format("%s:%s", "9030", "9030"), String.format("%s:%s", "9060", "9060"), - String.format("%s:%s", "8040", "8040"))); + String.format("%s:%s", "8040", "8040"), + String.format("%s:%s", "9611", "9611"), + String.format("%s:%s", "9610", "9610"))); return container; } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java index 78fd626..ad0b84a 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java @@ -24,6 +24,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; @@ -36,12 +37,19 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.e2e.doris.DorisContainerService; import org.apache.doris.kafka.connector.e2e.doris.DorisContainerServiceImpl; import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService; import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl; import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -195,4 +203,51 @@ public abstract class AbstractKafka2DorisSink { LOG.info("actual result: {}", Arrays.toString(actual.toArray())); Assert.assertArrayEquals(expected.toArray(), actual.toArray()); } + + protected void faultInjectionOpen() throws IOException { + String pointName = "FlushToken.submit_flush_error"; + String apiUrl = + String.format( + "http://%s:%s/api/debug_point/add/%s", + dorisContainerService.getInstanceHost(), 8040, pointName); + HttpPost httpPost = new HttpPost(apiUrl); + httpPost.addHeader(HttpHeaders.AUTHORIZATION, auth(USERNAME, PASSWORD)); + try (CloseableHttpClient httpClient = HttpClients.custom().build()) { + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); + if (statusCode == 200 && response.getEntity() != null) { + LOG.info("Debug point response {}", EntityUtils.toString(response.getEntity())); + } else { + LOG.info("Debug point failed, statusCode: {}, reason: {}", statusCode, reason); + } + } + } + } + + protected void faultInjectionClear() throws IOException { + String apiUrl = + String.format( + "http://%s:%s/api/debug_point/clear", + dorisContainerService.getInstanceHost(), 8040); + HttpPost httpPost = new HttpPost(apiUrl); + httpPost.addHeader(HttpHeaders.AUTHORIZATION, auth(USERNAME, PASSWORD)); + try (CloseableHttpClient httpClient = HttpClients.custom().build()) { + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); + if (statusCode == 200 && response.getEntity() != null) { + LOG.info("Debug point response {}", EntityUtils.toString(response.getEntity())); + } else { + LOG.info("Debug point failed, statusCode: {}, reason: {}", statusCode, reason); + } + } + } + } + + protected String auth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encoded); + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java new file mode 100644 index 0000000..c81055f --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.sink.stringconverter; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** DorisSinkFailoverSinkTest is a test class for Doris Sink Connector. */ +public class DorisSinkFailoverSinkTest extends AbstractStringE2ESinkTest { + private static final Logger LOG = LoggerFactory.getLogger(DorisSinkFailoverSinkTest.class); + private static String connectorName; + private static String jsonMsgConnectorContent; + private static DorisOptions dorisOptions; + private static String database; + + @BeforeClass + public static void setUp() { + initServer(); + initProducer(); + } + + public static void initialize(String connectorPath) { + jsonMsgConnectorContent = loadContent(connectorPath); + JsonNode rootNode = null; + try { + rootNode = objectMapper.readTree(jsonMsgConnectorContent); + } catch (IOException e) { + throw new DorisException("Failed to read content body.", e); + } + connectorName = rootNode.get(NAME).asText(); + JsonNode configNode = rootNode.get(CONFIG); + Map<String, String> configMap = objectMapper.convertValue(configNode, Map.class); + configMap.put(ConfigCheckUtils.TASK_ID, "1"); + Map<String, String> lowerCaseConfigMap = + DorisSinkConnectorConfig.convertToLowercase(configMap); + DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap); + dorisOptions = new DorisOptions(lowerCaseConfigMap); + database = dorisOptions.getDatabase(); + createDatabase(database); + setTimeZone(); + } + + private static void setTimeZone() { + executeSql(getJdbcConnection(), "set global time_zone = 'Asia/Shanghai'"); + } + + /** mock streamload failure */ + @Test + public void testStreamLoadFailoverSink() throws Exception { + LOG.info("start to test testStreamLoadFailoverSink."); + initialize("src/test/resources/e2e/string_converter/string_msg_failover_connector.json"); + Thread.sleep(5000); + String topic = "string_test_failover"; + String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}"; + produceMsg2Kafka(topic, msg1); + + String tableSql = + loadContent("src/test/resources/e2e/string_converter/string_msg_tab_failover.sql"); + createTable(tableSql); + + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + String querySql = + String.format("select id,name,age from %s.%s order by id", database, table); + LOG.info("start to query result from doris. sql={}", querySql); + while (true) { + List<String> result = executeSQLStatement(getJdbcConnection(), LOG, querySql, 3); + // until load success one time + if (result.size() >= 1) { + faultInjectionOpen(); + // mock new data + String msg2 = "{\"id\":2,\"name\":\"lisi\",\"age\":18}"; + produceMsg2Kafka(topic, msg2); + Thread.sleep(15000); + faultInjectionClear(); + break; + } else { + Thread.sleep(100); + } + } + + String msg3 = "{\"id\":3,\"name\":\"wangwu\",\"age\":38}"; + produceMsg2Kafka(topic, msg3); + Thread.sleep(25000); + + List<String> excepted = Arrays.asList("1,zhangsan,12", "2,lisi,18", "3,wangwu,38"); + checkResult(excepted, querySql, 3); + } + + public static List<String> executeSQLStatement( + Connection connection, Logger logger, String sql, int columnSize) { + List<String> result = new ArrayList<>(); + if (Objects.isNull(sql)) { + return result; + } + try (Statement statement = connection.createStatement()) { + logger.info("start to execute sql={}", sql); + ResultSet resultSet = statement.executeQuery(sql); + + while (resultSet.next()) { + StringJoiner sb = new StringJoiner(","); + for (int i = 1; i <= columnSize; i++) { + Object value = resultSet.getObject(i); + sb.add(String.valueOf(value)); + } + result.add(sb.toString()); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/resources/docker/doris/be.conf b/src/test/resources/docker/doris/be.conf new file mode 100644 index 0000000..94b76e0 --- /dev/null +++ b/src/test/resources/docker/doris/be.conf @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CUR_DATE=`date +%Y%m%d-%H%M%S` + +PPROF_TMPDIR="$DORIS_HOME/log/" + +JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" + +# For jdk 9+, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" + +# For jdk 17+, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives --add-opens=java.base/java.net=ALL-UNNAMED" + +# since 1.2, the JAVA_HOME need to be set to run BE process. +# JAVA_HOME=/path/to/jdk/ + +# https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile +# https://jemalloc.net/jemalloc.3.html +JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false" +JEMALLOC_PROF_PRFIX="" + +# INFO, WARNING, ERROR, FATAL +sys_log_level = INFO + +# ports for admin, web, heartbeat service +be_port = 9060 +webserver_port = 8040 +heartbeat_service_port = 9050 +brpc_port = 8060 +arrow_flight_sql_port = 9610 +enable_debug_points = true + +# HTTPS configures +enable_https = false +# path of certificate in PEM format. +ssl_certificate_path = "$DORIS_HOME/conf/cert.pem" +# path of private key in PEM format. +ssl_private_key_path = "$DORIS_HOME/conf/key.pem" + + +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# data root path, separate by ';' +# You can specify the storage type for each root path, HDD (cold data) or SSD (hot data) +# eg: +# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris +# storage_root_path = /home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD +# /home/disk2/doris,medium:HDD(default) +# +# you also can specify the properties by setting '<property>:<value>', separate by ',' +# property 'medium' has a higher priority than the extension of path +# +# Default value is ${DORIS_HOME}/storage, you should create it by hand. +# storage_root_path = ${DORIS_HOME}/storage + +# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers +# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers + +# Advanced configurations +# sys_log_dir = ${DORIS_HOME}/log +# sys_log_roll_mode = SIZE-MB-1024 +# sys_log_roll_num = 10 +# sys_log_verbose_modules = * +# log_buffer_level = -1 +# palo_cgroups + +# aws sdk log level +# Off = 0, +# Fatal = 1, +# Error = 2, +# Warn = 3, +# Info = 4, +# Debug = 5, +# Trace = 6 +# Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs +aws_log_level=0 +## If you are not running in aws cloud, you can disable EC2 metadata +AWS_EC2_METADATA_DISABLED=true \ No newline at end of file diff --git a/src/test/resources/docker/doris/fe.conf b/src/test/resources/docker/doris/fe.conf new file mode 100644 index 0000000..a45fb53 --- /dev/null +++ b/src/test/resources/docker/doris/fe.conf @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +##################################################################### +## The uppercase properties are read and exported by bin/start_fe.sh. +## To see all Frontend configurations, +## see fe/src/org/apache/doris/common/Config.java +##################################################################### + +CUR_DATE=`date +%Y%m%d-%H%M%S` + +# Log dir +LOG_DIR = ${DORIS_HOME}/log + +# For jdk 17, this JAVA_OPTS will be used as default JVM options +JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR -Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" + +# Set your own JAVA_HOME +# JAVA_HOME=/path/to/jdk/ + +## +## the lowercase properties are read by main program. +## + +# store metadata, must be created before start FE. +# Default value is ${DORIS_HOME}/doris-meta +# meta_dir = ${DORIS_HOME}/doris-meta + +# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers +# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers + +http_port = 8030 +rpc_port = 9020 +query_port = 9030 +edit_log_port = 9010 +arrow_flight_sql_port = 9611 +enable_debug_points = true +arrow_flight_token_cache_size = 50 +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# Advanced configurations +# log_roll_size_mb = 1024 +# INFO, WARN, ERROR, FATAL +sys_log_level = INFO +# NORMAL, BRIEF, ASYNC +sys_log_mode = ASYNC +# sys_log_roll_num = 10 +# sys_log_verbose_modules = org.apache.doris +# audit_log_dir = $LOG_DIR +# audit_log_modules = slow_query, query +# audit_log_roll_num = 10 +# meta_delay_toleration_second = 10 +# qe_max_connection = 1024 +# qe_query_timeout_second = 300 +# qe_slow_log_ms = 5000 \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/string_msg_failover_connector.json b/src/test/resources/e2e/string_converter/string_msg_failover_connector.json new file mode 100644 index 0000000..d8604b5 --- /dev/null +++ b/src/test/resources/e2e/string_converter/string_msg_failover_connector.json @@ -0,0 +1,25 @@ +{ + "name":"string_msg_failover_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"string_test_failover", + "tasks.max":"1", + "doris.topic2table.map": "string_test_failover:string_msg_tab_failover", + "buffer.count.records":"1", + "buffer.flush.time":"1200", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"string_msg_failover", + "load.model":"stream_load", + "delivery.guarantee":"exactly_once", + "enable.2pc": "true", + "max.retries": "10", + "retry.interval.ms": "5000", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql b/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql new file mode 100644 index 0000000..155f217 --- /dev/null +++ b/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE string_msg_failover.string_msg_tab_failover ( + id INT NULL, + name VARCHAR(100) NULL, + age INT NULL +) ENGINE=OLAP +DUPLICATE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org