This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ba21aabd [improve]improve group commit logic (#413)
ba21aabd is described below

commit ba21aabd0be664141e44581c5d16d97e9ff5467f
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue Jul 2 11:26:40 2024 +0800

    [improve]improve group commit logic (#413)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 14 +++++-
 .../doris/flink/sink/writer/DorisStreamLoad.java   | 44 +++++++++++++++----
 .../doris/flink/sink/writer/LoadConstants.java     |  1 +
 .../apache/doris/flink/sink/DorisSinkITCase.java   | 51 ++++++++++++++++++++++
 4 files changed, 99 insertions(+), 11 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index fbc6daa0..375e4335 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -68,6 +68,7 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
@@ -122,7 +123,11 @@ public class DorisBatchStreamLoad implements Serializable {
                                             LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
                             .getBytes();
         }
-        this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT);
+        this.enableGroupCommit =
+                loadProps.containsKey(GROUP_COMMIT)
+                        && !loadProps
+                                .getProperty(GROUP_COMMIT)
+                                .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
         this.executionOptions = executionOptions;
         this.flushQueue = new 
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
         if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
@@ -283,7 +288,12 @@ public class DorisBatchStreamLoad implements Serializable {
             Throwable resEx = new Throwable();
             int retry = 0;
             while (retry <= executionOptions.getMaxRetries()) {
-                LOG.info("stream load started for {} on host {}", label, 
hostPort);
+                if (enableGroupCommit) {
+                    LOG.info("stream load started with group commit on host 
{}", hostPort);
+                } else {
+                    LOG.info("stream load started for {} on host {}", label, 
hostPort);
+                }
+
                 try (CloseableHttpClient httpClient = 
httpClientBuilder.build()) {
                     try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
                         int statusCode = 
response.getStatusLine().getStatusCode();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 676de3df..4cbcb431 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -59,6 +59,7 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
@@ -131,7 +132,11 @@ public class DorisStreamLoad implements Serializable {
                                             LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
                             .getBytes();
         }
-        enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT);
+        this.enableGroupCommit =
+                streamLoadProp.containsKey(GROUP_COMMIT)
+                        && !streamLoadProp
+                                .getProperty(GROUP_COMMIT)
+                                .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
         loadBatchFirstRecord = true;
     }
 
@@ -263,7 +268,16 @@ public class DorisStreamLoad implements Serializable {
 
     public RespContent stopLoad() throws IOException {
         recordStream.endInput();
-        LOG.info("table {} stream load stopped for {} on host {}", table, 
currentLabel, hostPort);
+        if (enableGroupCommit) {
+            LOG.info("table {} stream load stopped with group commit on host 
{}", table, hostPort);
+        } else {
+            LOG.info(
+                    "table {} stream load stopped for {} on host {}",
+                    table,
+                    currentLabel,
+                    hostPort);
+        }
+
         Preconditions.checkState(pendingLoadFuture != null);
         try {
             return handlePreCommitResponse(pendingLoadFuture.get());
@@ -285,7 +299,11 @@ public class DorisStreamLoad implements Serializable {
         loadBatchFirstRecord = !isResume;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput(isResume);
-        LOG.info("table {} stream load started for {} on host {}", table, 
label, hostPort);
+        if (enableGroupCommit) {
+            LOG.info("table {} stream load started with group commit on host 
{}", table, hostPort);
+        } else {
+            LOG.info("table {} stream load started for {} on host {}", table, 
label, hostPort);
+        }
         this.currentLabel = label;
         try {
             InputStreamEntity entity = new InputStreamEntity(recordStream);
@@ -300,18 +318,26 @@ public class DorisStreamLoad implements Serializable {
             if (enable2PC) {
                 putBuilder.enable2PC();
             }
-            String finalLabel = label;
+
+            String executeMessage;
+            if (enableGroupCommit) {
+                executeMessage = "table " + table + " start execute load with 
group commit";
+            } else {
+                executeMessage = "table " + table + " start execute load for 
label " + label;
+            }
             pendingLoadFuture =
                     executorService.submit(
                             () -> {
-                                LOG.info(
-                                        "table {} start execute load for label 
{}",
-                                        table,
-                                        finalLabel);
+                                LOG.info(executeMessage);
                                 return httpClient.execute(putBuilder.build());
                             });
         } catch (Exception e) {
-            String err = "failed to stream load data with label: " + label;
+            String err;
+            if (enableGroupCommit) {
+                err = "failed to stream load data with group commit";
+            } else {
+                err = "failed to stream load data with label: " + label;
+            }
             LOG.warn(err, e);
             throw e;
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 2a79b736..e8cd87e6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -32,4 +32,5 @@ public class LoadConstants {
     public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
     public static final String READ_JSON_BY_LINE = "read_json_by_line";
     public static final String GROUP_COMMIT = "group_commit";
+    public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 6fc2da7e..91077ea2 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -58,6 +58,7 @@ public class DorisSinkITCase extends DorisTestBase {
     static final String TABLE_JSON_TBL = "tbl_json_tbl";
     static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
     static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
+    static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
     static final String TABLE_CSV_JM = "tbl_csv_jm";
     static final String TABLE_CSV_TM = "tbl_csv_tm";
 
@@ -264,6 +265,56 @@ public class DorisSinkITCase extends DorisTestBase {
         checkResult(expected, query, 2);
     }
 
+    @Test
+    public void testTableGroupCommit() throws Exception {
+        initializeTable(TABLE_GROUP_COMMIT);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_group_commit_sink ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'sink.label-prefix' = '"
+                                + UUID.randomUUID()
+                                + "',"
+                                + " 'sink.properties.column_separator' = 
'\\x01',"
+                                + " 'sink.properties.line_delimiter' = 
'\\x02',"
+                                + " 'sink.properties.group_commit' = 
'sync_mode',"
+                                + " 'sink.ignore.update-before' = 'false',"
+                                + " 'sink.enable.batch-mode' = 'true',"
+                                + " 'sink.enable-delete' = 'true',"
+                                + " 'sink.flush.queue-size' = '2',"
+                                + " 'sink.buffer-flush.max-rows' = '3',"
+                                + " 'sink.buffer-flush.max-bytes' = '5000',"
+                                + " 'sink.buffer-flush.interval' = '10s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_GROUP_COMMIT,
+                        USERNAME,
+                        PASSWORD);
+        tEnv.executeSql(sinkDDL);
+        tEnv.executeSql(
+                "INSERT INTO doris_group_commit_sink SELECT 'doris',1 union 
all  SELECT 'group_commit',2 union all  SELECT 'flink',3");
+
+        Thread.sleep(25000);
+        List<String> expected = Arrays.asList("doris,1", "flink,3", 
"group_commit,2");
+        String query =
+                String.format(
+                        "select name,age from %s.%s order by 1", DATABASE, 
TABLE_GROUP_COMMIT);
+        //
+        checkResult(expected, query, 2);
+    }
+
     @Test
     public void testJobManagerFailoverSink() throws Exception {
         initializeFailoverTable(TABLE_CSV_JM);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to