This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ace2e6  [feature]support stream load with group commit mode (#35)
4ace2e6 is described below

commit 4ace2e65a7c0bf383c14010e7a4823e5fd5e0aaf
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue Jul 9 15:13:16 2024 +0800

    [feature]support stream load with group commit mode (#35)
---
 .../doris/kafka/connector/cfg/DorisOptions.java    | 27 +++++++----
 .../kafka/connector/utils/ConfigCheckUtils.java    | 27 +++++++++++
 .../kafka/connector/writer/LoadConstants.java      |  4 ++
 .../connector/writer/load/DorisStreamLoad.java     | 23 +++++++--
 .../GroupCommitMode.java}                          | 32 ++++++++++---
 .../cfg/TestDorisSinkConnectorConfig.java          | 52 ++++++++++++++++++++
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 ++++++++++++++++++++--
 .../string_converter/group_commit_connector.json   | 23 +++++++++
 .../e2e/string_converter/group_commit_tab.sql      | 13 +++++
 9 files changed, 234 insertions(+), 22 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 69cbb80..e8c1933 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -55,8 +55,9 @@ public class DorisOptions {
     private boolean autoRedirect = true;
     private int requestReadTimeoutMs;
     private int requestConnectTimeoutMs;
+    private boolean enableGroupCommit;
     /** Properties for the StreamLoad. */
-    private final Properties streamLoadProp = new Properties();
+    private final Properties streamLoadProp;
 
     @Deprecated private String labelPrefix;
     private final String databaseTimeZone;
@@ -113,25 +114,31 @@ public class DorisOptions {
             this.requestReadTimeoutMs =
                     
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
         }
-        getStreamLoadPropFromConfig(config);
+        this.streamLoadProp = getStreamLoadPropFromConfig(config);
+        this.enableGroupCommit =
+                ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), 
enable2PC());
     }
 
-    private void getStreamLoadPropFromConfig(Map<String, String> config) {
-        setStreamLoadDefaultValues();
+    private Properties getStreamLoadPropFromConfig(Map<String, String> config) 
{
+        Properties properties = new Properties();
+        properties.putAll(getStreamLoadDefaultValues());
         for (Map.Entry<String, String> entry : config.entrySet()) {
             if 
(entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
                 String subKey =
                         entry.getKey()
                                 .substring(
                                         
DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
-                streamLoadProp.put(subKey, entry.getValue());
+                properties.put(subKey, entry.getValue());
             }
         }
+        return properties;
     }
 
-    private void setStreamLoadDefaultValues() {
-        streamLoadProp.setProperty("format", "json");
-        streamLoadProp.setProperty("read_json_by_line", "true");
+    private Properties getStreamLoadDefaultValues() {
+        Properties properties = new Properties();
+        properties.setProperty("format", "json");
+        properties.setProperty("read_json_by_line", "true");
+        return properties;
     }
 
     public String getName() {
@@ -182,6 +189,10 @@ public class DorisOptions {
         return enable2PC;
     }
 
+    public boolean enableGroupCommit() {
+        return enableGroupCommit;
+    }
+
     public Map<String, String> getTopicMap() {
         return topicMap;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index ca8ec31..51b8b06 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -19,8 +19,11 @@
 
 package org.apache.doris.kafka.connector.utils;
 
+import static 
org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS;
+
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.converter.ConverterMode;
@@ -28,6 +31,8 @@ import 
org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
 import org.apache.doris.kafka.connector.exception.ArgumentsException;
 import org.apache.doris.kafka.connector.exception.DorisException;
 import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
+import org.apache.doris.kafka.connector.writer.LoadConstants;
+import org.apache.doris.kafka.connector.writer.load.GroupCommitMode;
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -293,4 +298,26 @@ public class ConfigCheckUtils {
         }
         return false;
     }
+
+    public static boolean validateGroupCommitMode(Properties streamLoadProp, 
boolean enable2PC) {
+        if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
+            return false;
+        }
+
+        Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT);
+        String normalizedValue = value.toString().trim().toLowerCase();
+        if (!GroupCommitMode.instances().contains(normalizedValue)) {
+            throw new DorisException(
+                    "The value of group commit mode is an illegal parameter, 
illegal value="
+                            + value);
+        } else if (enable2PC) {
+            throw new DorisException(
+                    "When group commit is enabled, you should disable two 
phase commit! Please  set 'enable.2pc':'false'");
+        } else if (streamLoadProp.containsKey(PARTIAL_COLUMNS)
+                && streamLoadProp.get(PARTIAL_COLUMNS).equals("true")) {
+            throw new DorisException(
+                    "When group commit is enabled,you can not load data with 
partial column update.");
+        }
+        return true;
+    }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
index 598cc3a..3e887eb 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
@@ -26,4 +26,8 @@ public class LoadConstants {
 
     // Special identifier, label separator used for kafka-connect sink data
     public static final String FILE_DELIM_DEFAULT = "__KC_";
+
+    // since apache doris 2.1.0, support stream load with group commit mode.
+    public static final String GROUP_COMMIT = "group_commit";
+    public static final String PARTIAL_COLUMNS = "partial_columns";
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index b26a735..6d3f770 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -53,6 +53,7 @@ public class DorisStreamLoad extends DataLoad {
     private final CloseableHttpClient httpClient = new 
HttpUtils().getHttpClient();
     private final BackendUtils backendUtils;
     private Queue<KafkaRespContent> respContents = new LinkedList<>();
+    private final boolean enableGroupCommit;
 
     public DorisStreamLoad(BackendUtils backendUtils, DorisOptions 
dorisOptions, String topic) {
         this.database = dorisOptions.getDatabase();
@@ -63,10 +64,16 @@ public class DorisStreamLoad extends DataLoad {
         this.dorisOptions = dorisOptions;
         this.backendUtils = backendUtils;
         this.topic = topic;
+        this.enableGroupCommit = dorisOptions.enableGroupCommit();
     }
 
     /** execute stream load. */
     public void load(String label, RecordBuffer buffer) throws IOException {
+
+        if (enableGroupCommit) {
+            label = null;
+        }
+
         refreshLoadUrl(database, table);
         String data = buffer.getData();
         ByteArrayEntity entity = new 
ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8));
@@ -81,7 +88,12 @@ public class DorisStreamLoad extends DataLoad {
                 .enable2PC(dorisOptions.enable2PC())
                 .addProperties(dorisOptions.getStreamLoadProp());
 
-        LOG.info("stream load started for {} on host {}", label, hostPort);
+        if (enableGroupCommit) {
+            LOG.info("stream load started with group commit on host {}", 
hostPort);
+        } else {
+            LOG.info("stream load started for {} on host {}", label, hostPort);
+        }
+
         try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
             int statusCode = response.getStatusLine().getStatusCode();
             if (statusCode == 200 && response.getEntity() != null) {
@@ -101,10 +113,15 @@ public class DorisStreamLoad extends DataLoad {
                 respContent.setLastOffset(buffer.getLastOffset());
                 respContent.setTopic(topic);
                 respContents.add(respContent);
-                return;
             }
         } catch (Exception ex) {
-            String err = "failed to stream load data with label: " + label;
+            String err;
+            if (enableGroupCommit) {
+                err = "failed to stream load data with group commit";
+            } else {
+                err = "failed to stream load data with label: " + label;
+            }
+
             LOG.warn(err, ex);
             throw new StreamLoadException(err, ex);
         }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java
similarity index 57%
copy from 
src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
copy to 
src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java
index 598cc3a..869b940 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java
@@ -17,13 +17,31 @@
  * under the License.
  */
 
-package org.apache.doris.kafka.connector.writer;
+package org.apache.doris.kafka.connector.writer.load;
 
-public class LoadConstants {
-    public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
-    public static final String DORIS_DEL_TRUE = "1";
-    public static final String DORIS_DEL_FALSE = "0";
+import java.util.Arrays;
+import java.util.List;
 
-    // Special identifier, label separator used for kafka-connect sink data
-    public static final String FILE_DELIM_DEFAULT = "__KC_";
+public enum GroupCommitMode {
+    OFF_MODE("off_mode"),
+    SYNC_MODE("sync_mode"),
+    ASYNC_MODE("async_mode");
+
+    private final String name;
+
+    GroupCommitMode(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public static LoadModel of(String name) {
+        return LoadModel.valueOf(name.toUpperCase());
+    }
+
+    public static List<String> instances() {
+        return Arrays.asList(OFF_MODE.name, SYNC_MODE.name, ASYNC_MODE.name);
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index eae3ec0..8f44707 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import org.apache.doris.kafka.connector.exception.DorisException;
 import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
 import org.junit.Assert;
@@ -290,4 +291,55 @@ public class TestDorisSinkConnectorConfig {
             Assert.assertTrue(convertConfig.containsKey(s));
         }
     }
+
+    @Test(expected = DorisException.class)
+    public void testGroupCommitWithIllegalParams() {
+        Map<String, String> config = getConfig();
+        config.put("sink.properties.group_commit", "sync_modes");
+        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+    }
+
+    @Test(expected = DorisException.class)
+    public void testGroupCommitModeWithEnable2pc() {
+        Map<String, String> config = getConfig();
+        config.put("sink.properties.group_commit", "sync_mode");
+        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+        boolean enable2pc = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
+        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, enable2pc);
+    }
+
+    @Test(expected = DorisException.class)
+    public void testGroupCommitWithPartialUpdate() {
+        Map<String, String> config = getConfig();
+        config.put("sink.properties.group_commit", "sync_mode");
+        config.put("sink.properties.partial_columns", "true");
+        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+    }
+
+    @Test
+    public void testGroupCommitWithAsyncMode() {
+        Map<String, String> config = getConfig();
+        config.put("sink.properties.group_commit", "async_mode");
+        Properties streamLoadProp = getStreamLoadPropFromConfig(config);
+        config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false");
+        ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false);
+    }
+
+    private Properties getStreamLoadPropFromConfig(Map<String, String> config) 
{
+        Properties streamLoadProp = new Properties();
+        for (Map.Entry<String, String> entry : config.entrySet()) {
+            if 
(entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) {
+                String subKey =
+                        entry.getKey()
+                                .substring(
+                                        
DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length());
+                streamLoadProp.put(subKey, entry.getValue());
+            }
+        }
+        return streamLoadProp;
+    }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index cd3f455..227a203 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -24,7 +24,11 @@ import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.exception.DorisException;
@@ -47,12 +51,10 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
     public static void setUp() {
         initServer();
         initProducer();
-        initialize();
     }
 
-    public static void initialize() {
-        jsonMsgConnectorContent =
-                
loadContent("src/test/resources/e2e/string_converter/string_msg_connector.json");
+    public static void initialize(String connectorPath) {
+        jsonMsgConnectorContent = loadContent(connectorPath);
         JsonNode rootNode = null;
         try {
             rootNode = objectMapper.readTree(jsonMsgConnectorContent);
@@ -73,6 +75,8 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
 
     @Test
     public void testStringMsg() throws IOException, InterruptedException, 
SQLException {
+        
initialize("src/test/resources/e2e/string_converter/string_msg_connector.json");
+        Thread.sleep(5000);
         String topic = "string_test";
         String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
 
@@ -99,6 +103,49 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         Assert.assertEquals(12, age);
     }
 
+    @Test
+    public void testGroupCommit() throws Exception {
+
+        
initialize("src/test/resources/e2e/string_converter/group_commit_connector.json");
+        String topic = "group_commit_test";
+        String msg1 = "{\"id\":1,\"name\":\"kafka\",\"age\":12}";
+        String msg2 = "{\"id\":2,\"name\":\"doris\",\"age\":10}";
+
+        produceMsg2Kafka(topic, msg1);
+        produceMsg2Kafka(topic, msg2);
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/group_commit_tab.sql");
+        createTable(tableSql);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+        Thread.sleep(25000);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        List<String> expected = Arrays.asList("1,kafka,12", "2,doris,10");
+        String query = String.format("select id,name,age from %s.%s order by 
id", database, table);
+        checkResult(expected, query, 3);
+    }
+
+    public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
+        List<String> actual = new ArrayList<>();
+
+        try (Statement statement = getJdbcConnection().createStatement()) {
+            ResultSet sinkResultSet = statement.executeQuery(query);
+            while (sinkResultSet.next()) {
+                List<String> row = new ArrayList<>();
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = sinkResultSet.getObject(i);
+                    if (value == null) {
+                        row.add("null");
+                    } else {
+                        row.add(value.toString());
+                    }
+                }
+                actual.add(StringUtils.join(row, ","));
+            }
+        }
+        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+    }
+
     @AfterClass
     public static void closeInstance() {
         kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git 
a/src/test/resources/e2e/string_converter/group_commit_connector.json 
b/src/test/resources/e2e/string_converter/group_commit_connector.json
new file mode 100644
index 0000000..9e9b55c
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/group_commit_connector.json
@@ -0,0 +1,23 @@
+{
+  "name":"group_commit_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"group_commit_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "group_commit_test:group_commit_tab",
+    "buffer.count.records":"2",
+    "buffer.flush.time":"120",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"group_commit",
+    "sink.properties.group_commit":"sync_mode",
+    "enable.2pc": "false",
+    "load.model":"stream_load",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/group_commit_tab.sql 
b/src/test/resources/e2e/string_converter/group_commit_tab.sql
new file mode 100644
index 0000000..2ead8f1
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/group_commit_tab.sql
@@ -0,0 +1,13 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE group_commit.group_commit_tab (
+  id INT NULL,
+  name VARCHAR(100) NULL,
+  age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change"="true"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to