This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f5ea699  [improve] update config default value and add combine flush 
mode (#73)
f5ea699 is described below

commit f5ea699aec317bd28ff322e6d9039f3c802444d7
Author: wudi <676366...@qq.com>
AuthorDate: Mon May 12 11:40:30 2025 +0800

    [improve] update config default value and add combine flush mode (#73)
---
 .../doris/kafka/connector/DorisSinkTask.java       |   2 +-
 .../doris/kafka/connector/cfg/DorisOptions.java    |  26 +++-
 .../connector/cfg/DorisSinkConnectorConfig.java    |   9 +-
 .../service/DorisCombinedSinkService.java          | 143 +++++++++++++++++++++
 .../connector/service/DorisDefaultSinkService.java |  14 +-
 .../connector/service/DorisSinkServiceFactory.java |   9 +-
 .../kafka/connector/utils/ConfigCheckUtils.java    |  25 +++-
 .../doris/kafka/connector/writer/DorisWriter.java  |  11 +-
 .../e2e/sink/stringconverter/StringMsgE2ETest.java |  62 +++++++++
 .../string_converter/combine_flush_connector.json  |  22 ++++
 .../combine_flush_connector_2pc.json               |  23 ++++
 .../e2e/string_converter/combine_flush_tab.sql     |  12 ++
 .../e2e/string_converter/combine_flush_tab_2pc.sql |  12 ++
 13 files changed, 347 insertions(+), 23 deletions(-)

diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java 
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index 576de26..5f97b29 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -55,7 +55,7 @@ public class DorisSinkTask extends SinkTask {
         LOG.info("kafka doris sink task start with {}", parsedConfig);
         this.options = new DorisOptions(parsedConfig);
         this.remainingRetries = options.getMaxRetries();
-        this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig, 
context);
+        this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig, 
context, options);
     }
 
     /**
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 922eee6..aa5402c 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -70,6 +70,7 @@ public class DorisOptions {
     private final int maxRetries;
     private final int retryIntervalMs;
     private final BehaviorOnNullValues behaviorOnNullValues;
+    private final boolean enableCombineFlush;
 
     public DorisOptions(Map<String, String> config) {
         this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -84,6 +85,8 @@ public class DorisOptions {
         this.loadModel = 
LoadModel.of(config.get(DorisSinkConnectorConfig.LOAD_MODEL));
         this.deliveryGuarantee =
                 
DeliveryGuarantee.of(config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE));
+        this.enableCombineFlush =
+                
Boolean.valueOf(config.get(DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH));
         this.converterMode = 
ConverterMode.of(config.get(DorisSinkConnectorConfig.CONVERTER_MODE));
         this.schemaEvolutionMode =
                 SchemaEvolutionMode.of(
@@ -98,14 +101,21 @@ public class DorisOptions {
                         
config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP));
         this.tableNameField = 
config.get(DorisSinkConnectorConfig.RECORD_TABLE_NAME_FIELD);
 
-        if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
-            if 
(Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
-                this.enable2PC = true;
-                this.force2PC = true;
-            } else {
-                this.enable2PC = false;
+        if (enableCombineFlush) {
+            LOG.info("Enable combine flush, set 2pc to false.");
+            this.enable2PC = false;
+            this.force2PC = false;
+        } else {
+            if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
+                if 
(Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) {
+                    this.enable2PC = true;
+                    this.force2PC = true;
+                } else {
+                    this.enable2PC = false;
+                }
             }
         }
+
         this.enableCustomJMX = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
         this.enableDelete =
                 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE));
@@ -350,4 +360,8 @@ public class DorisOptions {
     public BehaviorOnNullValues getBehaviorOnNullValues() {
         return behaviorOnNullValues;
     }
+
+    public boolean isEnableCombineFlush() {
+        return enableCombineFlush;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 5b8807e..97bc125 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -48,16 +48,16 @@ public class DorisSinkConnectorConfig {
     // Connector config
     private static final String CONNECTOR_CONFIG = "Connector Config";
     public static final String BUFFER_COUNT_RECORDS = "buffer.count.records";
-    public static final long BUFFER_COUNT_RECORDS_DEFAULT = 10000;
+    public static final long BUFFER_COUNT_RECORDS_DEFAULT = 50000;
     public static final String BUFFER_SIZE_BYTES = "buffer.size.bytes";
-    public static final long BUFFER_SIZE_BYTES_DEFAULT = 5000000;
+    public static final long BUFFER_SIZE_BYTES_DEFAULT = 100 * 1024 * 1024;
     public static final long BUFFER_SIZE_BYTES_MIN = 1;
     public static final String TOPICS_TABLES_MAP = "doris.topic2table.map";
     public static final String RECORD_TABLE_NAME_FIELD = 
"record.tablename.field";
     public static final String LABEL_PREFIX = "label.prefix";
 
     // Time in seconds
-    public static final long BUFFER_FLUSH_TIME_SEC_MIN = 10;
+    public static final long BUFFER_FLUSH_TIME_SEC_MIN = 1;
     public static final long BUFFER_FLUSH_TIME_SEC_DEFAULT = 120;
     public static final String BUFFER_FLUSH_TIME_SEC = "buffer.flush.time";
 
@@ -81,6 +81,8 @@ public class DorisSinkConnectorConfig {
     public static final String AUTO_REDIRECT = "auto.redirect";
     public static final String DELIVERY_GUARANTEE = "delivery.guarantee";
     public static final String DELIVERY_GUARANTEE_DEFAULT = 
DeliveryGuarantee.AT_LEAST_ONCE.name();
+    public static final String ENABLE_COMBINE_FLUSH = "enable.combine.flush";
+    public static final String ENABLE_COMBINE_FLUSH_DEFAULT = "false";
     public static final String CONVERTER_MODE = "converter.mode";
     public static final String CONVERT_MODE_DEFAULT = 
ConverterMode.NORMAL.getName();
 
@@ -130,6 +132,7 @@ public class DorisSinkConnectorConfig {
         setFieldToDefaultValues(
                 config, RETRY_INTERVAL_MS, 
String.valueOf(RETRY_INTERVAL_MS_DEFAULT));
         setFieldToDefaultValues(config, BEHAVIOR_ON_NULL_VALUES, 
BEHAVIOR_ON_NULL_VALUES_DEFAULT);
+        setFieldToDefaultValues(config, ENABLE_COMBINE_FLUSH, 
ENABLE_COMBINE_FLUSH_DEFAULT);
     }
 
     public static Map<String, String> convertToLowercase(Map<String, String> 
config) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
new file mode 100644
index 0000000..711b405
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.service;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.doris.kafka.connector.writer.DorisWriter;
+import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Combined all partitions and write once. */
+public class DorisCombinedSinkService extends DorisDefaultSinkService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisCombinedSinkService.class);
+
+    private final Map<String, HashMap<Integer, Long>> topicPartitionOffset;
+
+    DorisCombinedSinkService(Map<String, String> config, SinkTaskContext 
context) {
+        super(config, context);
+        this.topicPartitionOffset = new HashMap<>();
+    }
+
+    /**
+     * Create new task
+     *
+     * @param tableName destination table name in doris
+     * @param topicPartition TopicPartition passed from Kafka
+     */
+    @Override
+    public void startTask(final String tableName, final TopicPartition 
topicPartition) {
+        // check if the task is already started
+        String writerKey = getWriterKey(topicPartition.topic(), tableName);
+        if (writer.containsKey(writerKey)) {
+            LOG.info("already start task with key {}", writerKey);
+        } else {
+            String topic = topicPartition.topic();
+
+            // Only by topic
+            int partition = -1;
+            DorisWriter dorisWriter =
+                    new StreamLoadWriter(
+                            tableName, topic, partition, dorisOptions, conn, 
connectMonitor);
+            writer.put(writerKey, dorisWriter);
+            metricsJmxReporter.start();
+        }
+    }
+
+    @Override
+    public void insert(final Collection<SinkRecord> records) {
+        // note that records can be empty
+        for (SinkRecord record : records) {
+            // skip records
+            if (shouldSkipRecord(record)) {
+                continue;
+            }
+            // check topic mutating SMTs
+            checkTopicMutating(record);
+
+            String topic = record.topic();
+            int partition = record.kafkaPartition();
+            topicPartitionOffset
+                    .computeIfAbsent(topic, k -> new HashMap<>())
+                    .put(partition, record.kafkaOffset());
+            // Might happen a count of record based flushing,buffer
+            insert(record);
+        }
+
+        // check all sink writer to see if they need to be flushed
+        for (DorisWriter writer : writer.values()) {
+            // Time based flushing
+            if (writer.shouldFlush()) {
+                writer.flushBuffer();
+            }
+        }
+    }
+
+    @Override
+    public void insert(SinkRecord record) {
+        String tableName = getSinkDorisTableName(record);
+        String writerKey = getWriterKey(record.topic(), tableName);
+        // init a new topic partition
+        if (!writer.containsKey(writerKey)) {
+            startTask(tableName, new TopicPartition(record.topic(), -1));
+        }
+        writer.get(writerKey).insert(record);
+    }
+
+    @Override
+    public long getOffset(final TopicPartition topicPartition) {
+        String topic = topicPartition.topic();
+        int partition = topicPartition.partition();
+        if (topicPartitionOffset.containsKey(topic)
+                && topicPartitionOffset.get(topic).containsKey(partition)) {
+            return topicPartitionOffset.get(topic).get(partition);
+        }
+        return 0;
+    }
+
+    @Override
+    public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        // Here we force flushing the data in memory once to
+        // ensure that the offsets recorded in topicPartitionOffset have been 
flushed to doris
+        for (DorisWriter writer : writer.values()) {
+            writer.flushBuffer();
+        }
+    }
+
+    /**
+     * Parse the writer unique key
+     *
+     * @param topic topic name
+     * @param tableName table name
+     * @return writer key
+     */
+    private String getWriterKey(String topic, String tableName) {
+        if (dorisOptions.getTopicMapTable(topic).equals(tableName)) {
+            return topic;
+        }
+        return topic + "_" + tableName;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index e091264..68f0d34 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -64,13 +64,13 @@ import org.slf4j.LoggerFactory;
 public class DorisDefaultSinkService implements DorisSinkService {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDefaultSinkService.class);
 
-    private final ConnectionProvider conn;
-    private final Map<String, DorisWriter> writer;
-    private final DorisOptions dorisOptions;
-    private final MetricsJmxReporter metricsJmxReporter;
-    private final DorisConnectMonitor connectMonitor;
-    private final ObjectMapper objectMapper;
-    private final SinkTaskContext context;
+    protected final ConnectionProvider conn;
+    protected final Map<String, DorisWriter> writer;
+    protected final DorisOptions dorisOptions;
+    protected final MetricsJmxReporter metricsJmxReporter;
+    protected final DorisConnectMonitor connectMonitor;
+    protected final ObjectMapper objectMapper;
+    protected final SinkTaskContext context;
 
     DorisDefaultSinkService(Map<String, String> config, SinkTaskContext 
context) {
         this.dorisOptions = new DorisOptions(config);
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
index dbf7192..130ea06 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java
@@ -20,13 +20,18 @@
 package org.apache.doris.kafka.connector.service;
 
 import java.util.Map;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 
 /** A factory to create {@link DorisSinkService} */
 public class DorisSinkServiceFactory {
 
     public static DorisSinkService getDorisSinkService(
-            Map<String, String> connectorConfig, SinkTaskContext context) {
-        return new DorisDefaultSinkService(connectorConfig, context);
+            Map<String, String> connectorConfig, SinkTaskContext context, 
DorisOptions options) {
+        if (options.isEnableCombineFlush()) {
+            return new DorisCombinedSinkService(connectorConfig, context);
+        } else {
+            return new DorisDefaultSinkService(connectorConfig, context);
+        }
     }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index 465b7dc..e1588c3 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -144,7 +144,7 @@ public class ConfigCheckUtils {
                 || isIllegalRange(
                         bufferFlushTime, 
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN)) {
             LOG.error(
-                    "{} cannot be empty or not a number or less than 10.",
+                    "{} cannot be empty or not a number or less than 1.",
                     DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
             configIsValid = false;
         }
@@ -162,11 +162,32 @@ public class ConfigCheckUtils {
         if (!validateEnumInstances(deliveryGuarantee, 
DeliveryGuarantee.instances())) {
             LOG.error(
                     "The value of {} is an illegal parameter of {}.",
-                    loadModel,
+                    deliveryGuarantee,
                     DorisSinkConnectorConfig.DELIVERY_GUARANTEE);
             configIsValid = false;
         }
 
+        String enableCombineFlush = 
config.get(DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH);
+        if (!validateEnumInstances(enableCombineFlush, new String[] {"true", 
"false"})) {
+            LOG.error(
+                    "The value of {} is an illegal parameter of {}.",
+                    enableCombineFlush,
+                    DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH);
+            configIsValid = false;
+        }
+
+        if (configIsValid
+                && Boolean.parseBoolean(enableCombineFlush)
+                && 
DeliveryGuarantee.EXACTLY_ONCE.name().equalsIgnoreCase(deliveryGuarantee)) {
+            LOG.error(
+                    "The value of {} is not supported set {} when {} is set to 
{}.",
+                    DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH,
+                    enableCombineFlush,
+                    DorisSinkConnectorConfig.DELIVERY_GUARANTEE,
+                    DeliveryGuarantee.EXACTLY_ONCE.name());
+            configIsValid = false;
+        }
+
         String converterMode = 
config.get(DorisSinkConnectorConfig.CONVERTER_MODE);
         if (!validateEnumInstances(converterMode, ConverterMode.instances())) {
             LOG.error(
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 348f784..9e5cfb9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -116,8 +116,11 @@ public abstract class DorisWriter {
 
     protected void insertRecord(final SinkRecord record) {
         // discard the record if the record offset is smaller or equal to 
server side offset
-        if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
-                && record.kafkaOffset() > processedOffset.get()) {
+        // when enable.combine.flush=true, No verification is required because 
the offsets of
+        // multiple partitions cannot be compared.
+        if (dorisOptions.isEnableCombineFlush()
+                || (record.kafkaOffset() > this.offsetPersistedInDoris.get()
+                        && record.kafkaOffset() > processedOffset.get())) {
             SinkRecord dorisRecord = record;
             RecordBuffer tmpBuff = null;
 
@@ -130,6 +133,10 @@ public abstract class DorisWriter {
             }
 
             if (tmpBuff != null) {
+                LOG.info(
+                        "trigger flush by buffer size or count, buffer size: 
{}, num of records: {}",
+                        tmpBuff.getBufferSizeBytes(),
+                        tmpBuff.getNumOfRecords());
                 flush(tmpBuff);
             }
             processedOffset.set(dorisRecord.kafkaOffset());
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 6d073c2..056f513 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -411,6 +411,68 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expected, query1, 3);
     }
 
+    @Test
+    public void testCombineFlush() throws IOException, InterruptedException, 
SQLException {
+        
initialize("src/test/resources/e2e/string_converter/combine_flush_connector.json");
+        Thread.sleep(5000);
+        String topic = "combine_test";
+        String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+
+        produceMsg2Kafka(topic, msg);
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/combine_flush_tab.sql");
+        createTable(tableSql);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        Statement statement = getJdbcConnection().createStatement();
+        String querySql = "select * from " + database + "." + table;
+        LOG.info("start to query result from doris. sql={}", querySql);
+        ResultSet resultSet = statement.executeQuery(querySql);
+
+        Assert.assertTrue(resultSet.next());
+
+        int id = resultSet.getInt("id");
+        String name = resultSet.getString("name");
+        int age = resultSet.getInt("age");
+        LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+        Assert.assertEquals(1, id);
+        Assert.assertEquals("zhangsan", name);
+        Assert.assertEquals(12, age);
+    }
+
+    @Test
+    public void testCombineFlush2PC() throws IOException, 
InterruptedException, SQLException {
+        
initialize("src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json");
+        Thread.sleep(5000);
+        String topic = "combine_test_2pc";
+        String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+
+        produceMsg2Kafka(topic, msg);
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql");
+        createTable(tableSql);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        Statement statement = getJdbcConnection().createStatement();
+        String querySql = "select * from " + database + "." + table;
+        LOG.info("start to query result from doris. sql={}", querySql);
+        ResultSet resultSet = statement.executeQuery(querySql);
+
+        Assert.assertTrue(resultSet.next());
+
+        int id = resultSet.getInt("id");
+        String name = resultSet.getString("name");
+        int age = resultSet.getInt("age");
+        LOG.info("Query result is id={}, name={}, age={}", id, name, age);
+
+        Assert.assertEquals(1, id);
+        Assert.assertEquals("zhangsan", name);
+        Assert.assertEquals(12, age);
+    }
+
     @AfterClass
     public static void closeInstance() {
         kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git 
a/src/test/resources/e2e/string_converter/combine_flush_connector.json 
b/src/test/resources/e2e/string_converter/combine_flush_connector.json
new file mode 100644
index 0000000..96c3cf8
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/combine_flush_connector.json
@@ -0,0 +1,22 @@
+{
+  "name":"combine_flush_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"combine_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "combine_test:combine_flush_tab",
+    "buffer.count.records":"100",
+    "buffer.flush.time":"1",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"combine_flush",
+    "load.model":"stream_load",
+    "enable.combine.flush":"true",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git 
a/src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json 
b/src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json
new file mode 100644
index 0000000..066ef3b
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json
@@ -0,0 +1,23 @@
+{
+  "name":"combine_flush_connector_2pc",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"combine_test_2pc",
+    "tasks.max":"1",
+    "doris.topic2table.map": "combine_test_2pc:combine_flush_tab_2pc",
+    "buffer.count.records":"100",
+    "buffer.flush.time":"1",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"combine_flush_2pc",
+    "load.model":"stream_load",
+    "enable.combine.flush":"true",
+    "enable.2pc": "true",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/combine_flush_tab.sql 
b/src/test/resources/e2e/string_converter/combine_flush_tab.sql
new file mode 100644
index 0000000..88180fa
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/combine_flush_tab.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE combine_flush.combine_flush_tab (
+  id INT NULL,
+  name VARCHAR(100) NULL,
+  age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql 
b/src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql
new file mode 100644
index 0000000..b946239
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE combine_flush_2pc.combine_flush_tab_2pc (
+  id INT NULL,
+  name VARCHAR(100) NULL,
+  age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to