This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 4ace2e6 [feature]support stream load with group commit mode (#35) 4ace2e6 is described below commit 4ace2e65a7c0bf383c14010e7a4823e5fd5e0aaf Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jul 9 15:13:16 2024 +0800 [feature]support stream load with group commit mode (#35) --- .../doris/kafka/connector/cfg/DorisOptions.java | 27 +++++++---- .../kafka/connector/utils/ConfigCheckUtils.java | 27 +++++++++++ .../kafka/connector/writer/LoadConstants.java | 4 ++ .../connector/writer/load/DorisStreamLoad.java | 23 +++++++-- .../GroupCommitMode.java} | 32 ++++++++++--- .../cfg/TestDorisSinkConnectorConfig.java | 52 ++++++++++++++++++++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 ++++++++++++++++++++-- .../string_converter/group_commit_connector.json | 23 +++++++++ .../e2e/string_converter/group_commit_tab.sql | 13 +++++ 9 files changed, 234 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 69cbb80..e8c1933 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -55,8 +55,9 @@ public class DorisOptions { private boolean autoRedirect = true; private int requestReadTimeoutMs; private int requestConnectTimeoutMs; + private boolean enableGroupCommit; /** Properties for the StreamLoad. */ - private final Properties streamLoadProp = new Properties(); + private final Properties streamLoadProp; @Deprecated private String labelPrefix; private final String databaseTimeZone; @@ -113,25 +114,31 @@ public class DorisOptions { this.requestReadTimeoutMs = Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS)); } - getStreamLoadPropFromConfig(config); + this.streamLoadProp = getStreamLoadPropFromConfig(config); + this.enableGroupCommit = + ConfigCheckUtils.validateGroupCommitMode(getStreamLoadProp(), enable2PC()); } - private void getStreamLoadPropFromConfig(Map<String, String> config) { - setStreamLoadDefaultValues(); + private Properties getStreamLoadPropFromConfig(Map<String, String> config) { + Properties properties = new Properties(); + properties.putAll(getStreamLoadDefaultValues()); for (Map.Entry<String, String> entry : config.entrySet()) { if (entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) { String subKey = entry.getKey() .substring( DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length()); - streamLoadProp.put(subKey, entry.getValue()); + properties.put(subKey, entry.getValue()); } } + return properties; } - private void setStreamLoadDefaultValues() { - streamLoadProp.setProperty("format", "json"); - streamLoadProp.setProperty("read_json_by_line", "true"); + private Properties getStreamLoadDefaultValues() { + Properties properties = new Properties(); + properties.setProperty("format", "json"); + properties.setProperty("read_json_by_line", "true"); + return properties; } public String getName() { @@ -182,6 +189,10 @@ public class DorisOptions { return enable2PC; } + public boolean enableGroupCommit() { + return enableGroupCommit; + } + public Map<String, String> getTopicMap() { return topicMap; } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index ca8ec31..51b8b06 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -19,8 +19,11 @@ package org.apache.doris.kafka.connector.utils; +import static org.apache.doris.kafka.connector.writer.LoadConstants.PARTIAL_COLUMNS; + import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.converter.ConverterMode; @@ -28,6 +31,8 @@ import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; import org.apache.doris.kafka.connector.exception.ArgumentsException; import org.apache.doris.kafka.connector.exception.DorisException; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; +import org.apache.doris.kafka.connector.writer.LoadConstants; +import org.apache.doris.kafka.connector.writer.load.GroupCommitMode; import org.apache.doris.kafka.connector.writer.load.LoadModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,4 +298,26 @@ public class ConfigCheckUtils { } return false; } + + public static boolean validateGroupCommitMode(Properties streamLoadProp, boolean enable2PC) { + if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) { + return false; + } + + Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT); + String normalizedValue = value.toString().trim().toLowerCase(); + if (!GroupCommitMode.instances().contains(normalizedValue)) { + throw new DorisException( + "The value of group commit mode is an illegal parameter, illegal value=" + + value); + } else if (enable2PC) { + throw new DorisException( + "When group commit is enabled, you should disable two phase commit! Please set 'enable.2pc':'false'"); + } else if (streamLoadProp.containsKey(PARTIAL_COLUMNS) + && streamLoadProp.get(PARTIAL_COLUMNS).equals("true")) { + throw new DorisException( + "When group commit is enabled,you can not load data with partial column update."); + } + return true; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java index 598cc3a..3e887eb 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java @@ -26,4 +26,8 @@ public class LoadConstants { // Special identifier, label separator used for kafka-connect sink data public static final String FILE_DELIM_DEFAULT = "__KC_"; + + // since apache doris 2.1.0, support stream load with group commit mode. + public static final String GROUP_COMMIT = "group_commit"; + public static final String PARTIAL_COLUMNS = "partial_columns"; } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java index b26a735..6d3f770 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java @@ -53,6 +53,7 @@ public class DorisStreamLoad extends DataLoad { private final CloseableHttpClient httpClient = new HttpUtils().getHttpClient(); private final BackendUtils backendUtils; private Queue<KafkaRespContent> respContents = new LinkedList<>(); + private final boolean enableGroupCommit; public DorisStreamLoad(BackendUtils backendUtils, DorisOptions dorisOptions, String topic) { this.database = dorisOptions.getDatabase(); @@ -63,10 +64,16 @@ public class DorisStreamLoad extends DataLoad { this.dorisOptions = dorisOptions; this.backendUtils = backendUtils; this.topic = topic; + this.enableGroupCommit = dorisOptions.enableGroupCommit(); } /** execute stream load. */ public void load(String label, RecordBuffer buffer) throws IOException { + + if (enableGroupCommit) { + label = null; + } + refreshLoadUrl(database, table); String data = buffer.getData(); ByteArrayEntity entity = new ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8)); @@ -81,7 +88,12 @@ public class DorisStreamLoad extends DataLoad { .enable2PC(dorisOptions.enable2PC()) .addProperties(dorisOptions.getStreamLoadProp()); - LOG.info("stream load started for {} on host {}", label, hostPort); + if (enableGroupCommit) { + LOG.info("stream load started with group commit on host {}", hostPort); + } else { + LOG.info("stream load started for {} on host {}", label, hostPort); + } + try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == 200 && response.getEntity() != null) { @@ -101,10 +113,15 @@ public class DorisStreamLoad extends DataLoad { respContent.setLastOffset(buffer.getLastOffset()); respContent.setTopic(topic); respContents.add(respContent); - return; } } catch (Exception ex) { - String err = "failed to stream load data with label: " + label; + String err; + if (enableGroupCommit) { + err = "failed to stream load data with group commit"; + } else { + err = "failed to stream load data with label: " + label; + } + LOG.warn(err, ex); throw new StreamLoadException(err, ex); } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java similarity index 57% copy from src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java copy to src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java index 598cc3a..869b940 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/LoadConstants.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/GroupCommitMode.java @@ -17,13 +17,31 @@ * under the License. */ -package org.apache.doris.kafka.connector.writer; +package org.apache.doris.kafka.connector.writer.load; -public class LoadConstants { - public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; - public static final String DORIS_DEL_TRUE = "1"; - public static final String DORIS_DEL_FALSE = "0"; +import java.util.Arrays; +import java.util.List; - // Special identifier, label separator used for kafka-connect sink data - public static final String FILE_DELIM_DEFAULT = "__KC_"; +public enum GroupCommitMode { + OFF_MODE("off_mode"), + SYNC_MODE("sync_mode"), + ASYNC_MODE("async_mode"); + + private final String name; + + GroupCommitMode(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static LoadModel of(String name) { + return LoadModel.valueOf(name.toUpperCase()); + } + + public static List<String> instances() { + return Arrays.asList(OFF_MODE.name, SYNC_MODE.name, ASYNC_MODE.name); + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java index eae3ec0..8f44707 100644 --- a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java +++ b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.doris.kafka.connector.exception.DorisException; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.junit.Assert; @@ -290,4 +291,55 @@ public class TestDorisSinkConnectorConfig { Assert.assertTrue(convertConfig.containsKey(s)); } } + + @Test(expected = DorisException.class) + public void testGroupCommitWithIllegalParams() { + Map<String, String> config = getConfig(); + config.put("sink.properties.group_commit", "sync_modes"); + Properties streamLoadProp = getStreamLoadPropFromConfig(config); + config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false"); + ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false); + } + + @Test(expected = DorisException.class) + public void testGroupCommitModeWithEnable2pc() { + Map<String, String> config = getConfig(); + config.put("sink.properties.group_commit", "sync_mode"); + Properties streamLoadProp = getStreamLoadPropFromConfig(config); + boolean enable2pc = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC)); + ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, enable2pc); + } + + @Test(expected = DorisException.class) + public void testGroupCommitWithPartialUpdate() { + Map<String, String> config = getConfig(); + config.put("sink.properties.group_commit", "sync_mode"); + config.put("sink.properties.partial_columns", "true"); + config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false"); + Properties streamLoadProp = getStreamLoadPropFromConfig(config); + ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false); + } + + @Test + public void testGroupCommitWithAsyncMode() { + Map<String, String> config = getConfig(); + config.put("sink.properties.group_commit", "async_mode"); + Properties streamLoadProp = getStreamLoadPropFromConfig(config); + config.put(DorisSinkConnectorConfig.ENABLE_2PC, "false"); + ConfigCheckUtils.validateGroupCommitMode(streamLoadProp, false); + } + + private Properties getStreamLoadPropFromConfig(Map<String, String> config) { + Properties streamLoadProp = new Properties(); + for (Map.Entry<String, String> entry : config.entrySet()) { + if (entry.getKey().startsWith(DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX)) { + String subKey = + entry.getKey() + .substring( + DorisSinkConnectorConfig.STREAM_LOAD_PROP_PREFIX.length()); + streamLoadProp.put(subKey, entry.getValue()); + } + } + return streamLoadProp; + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index cd3f455..227a203 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -24,7 +24,11 @@ import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.exception.DorisException; @@ -47,12 +51,10 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { public static void setUp() { initServer(); initProducer(); - initialize(); } - public static void initialize() { - jsonMsgConnectorContent = - loadContent("src/test/resources/e2e/string_converter/string_msg_connector.json"); + public static void initialize(String connectorPath) { + jsonMsgConnectorContent = loadContent(connectorPath); JsonNode rootNode = null; try { rootNode = objectMapper.readTree(jsonMsgConnectorContent); @@ -73,6 +75,8 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { @Test public void testStringMsg() throws IOException, InterruptedException, SQLException { + initialize("src/test/resources/e2e/string_converter/string_msg_connector.json"); + Thread.sleep(5000); String topic = "string_test"; String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}"; @@ -99,6 +103,49 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { Assert.assertEquals(12, age); } + @Test + public void testGroupCommit() throws Exception { + + initialize("src/test/resources/e2e/string_converter/group_commit_connector.json"); + String topic = "group_commit_test"; + String msg1 = "{\"id\":1,\"name\":\"kafka\",\"age\":12}"; + String msg2 = "{\"id\":2,\"name\":\"doris\",\"age\":10}"; + + produceMsg2Kafka(topic, msg1); + produceMsg2Kafka(topic, msg2); + String tableSql = + loadContent("src/test/resources/e2e/string_converter/group_commit_tab.sql"); + createTable(tableSql); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + Thread.sleep(25000); + + String table = dorisOptions.getTopicMapTable(topic); + List<String> expected = Arrays.asList("1,kafka,12", "2,doris,10"); + String query = String.format("select id,name,age from %s.%s order by id", database, table); + checkResult(expected, query, 3); + } + + public void checkResult(List<String> expected, String query, int columnSize) throws Exception { + List<String> actual = new ArrayList<>(); + + try (Statement statement = getJdbcConnection().createStatement()) { + ResultSet sinkResultSet = statement.executeQuery(query); + while (sinkResultSet.next()) { + List<String> row = new ArrayList<>(); + for (int i = 1; i <= columnSize; i++) { + Object value = sinkResultSet.getObject(i); + if (value == null) { + row.add("null"); + } else { + row.add(value.toString()); + } + } + actual.add(StringUtils.join(row, ",")); + } + } + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } + @AfterClass public static void closeInstance() { kafkaContainerService.deleteKafkaConnector(connectorName); diff --git a/src/test/resources/e2e/string_converter/group_commit_connector.json b/src/test/resources/e2e/string_converter/group_commit_connector.json new file mode 100644 index 0000000..9e9b55c --- /dev/null +++ b/src/test/resources/e2e/string_converter/group_commit_connector.json @@ -0,0 +1,23 @@ +{ + "name":"group_commit_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"group_commit_test", + "tasks.max":"1", + "doris.topic2table.map": "group_commit_test:group_commit_tab", + "buffer.count.records":"2", + "buffer.flush.time":"120", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"group_commit", + "sink.properties.group_commit":"sync_mode", + "enable.2pc": "false", + "load.model":"stream_load", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/group_commit_tab.sql b/src/test/resources/e2e/string_converter/group_commit_tab.sql new file mode 100644 index 0000000..2ead8f1 --- /dev/null +++ b/src/test/resources/e2e/string_converter/group_commit_tab.sql @@ -0,0 +1,13 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE group_commit.group_commit_tab ( + id INT NULL, + name VARCHAR(100) NULL, + age INT NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"light_schema_change"="true" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org