This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 43a055a9 [improve] support group commit (#412)
43a055a9 is described below

commit 43a055a9e6b4c728912725976eedbecdfb8b270c
Author: wudi <676366...@qq.com>
AuthorDate: Mon Jul 1 10:31:15 2024 +0800

    [improve] support group commit (#412)
---
 .../java/org/apache/doris/flink/sink/HttpPutBuilder.java     |  4 +++-
 .../apache/doris/flink/sink/batch/DorisBatchStreamLoad.java  |  6 ++++++
 .../org/apache/doris/flink/sink/writer/DorisStreamLoad.java  | 12 +++++++++++-
 .../org/apache/doris/flink/sink/writer/LoadConstants.java    |  1 +
 .../apache/doris/flink/sink/copy/TestDorisCopyWriter.java    |  3 +++
 5 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
index 023cd31a..44f6c9fe 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
@@ -111,7 +111,9 @@ public class HttpPutBuilder {
     }
 
     public HttpPutBuilder setLabel(String label) {
-        header.put("label", label);
+        if (label != null) {
+            header.put("label", label);
+        }
         return this;
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index d9fba749..fbc6daa0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -67,6 +67,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
+import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
@@ -95,6 +96,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private AtomicReference<Throwable> exception = new AtomicReference<>(null);
     private HttpClientBuilder httpClientBuilder = new 
HttpUtil().getHttpClientBuilderForBatch();
     private BackendUtil backendUtil;
+    private boolean enableGroupCommit;
 
     public DorisBatchStreamLoad(
             DorisOptions dorisOptions,
@@ -120,6 +122,7 @@ public class DorisBatchStreamLoad implements Serializable {
                                             LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
                             .getBytes();
         }
+        this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT);
         this.executionOptions = executionOptions;
         this.flushQueue = new 
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
         if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
@@ -260,6 +263,9 @@ public class DorisBatchStreamLoad implements Serializable {
 
         /** execute stream load. */
         public void load(String label, BatchRecordBuffer buffer) throws 
IOException {
+            if (enableGroupCommit) {
+                label = null;
+            }
             refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
             ByteBuffer data = buffer.getData();
             ByteArrayEntity entity =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 14e44dee..676de3df 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -58,6 +58,7 @@ import static 
org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
 import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
+import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
@@ -87,6 +88,7 @@ public class DorisStreamLoad implements Serializable {
     private final ExecutorService executorService;
     private boolean loadBatchFirstRecord;
     private volatile String currentLabel;
+    private boolean enableGroupCommit;
 
     public DorisStreamLoad(
             String hostPort,
@@ -129,6 +131,7 @@ public class DorisStreamLoad implements Serializable {
                                             LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
                             .getBytes();
         }
+        enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT);
         loadBatchFirstRecord = true;
     }
 
@@ -276,6 +279,9 @@ public class DorisStreamLoad implements Serializable {
      * @throws IOException
      */
     public void startLoad(String label, boolean isResume) throws IOException {
+        if (enableGroupCommit) {
+            label = null;
+        }
         loadBatchFirstRecord = !isResume;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput(isResume);
@@ -294,10 +300,14 @@ public class DorisStreamLoad implements Serializable {
             if (enable2PC) {
                 putBuilder.enable2PC();
             }
+            String finalLabel = label;
             pendingLoadFuture =
                     executorService.submit(
                             () -> {
-                                LOG.info("table {} start execute load for 
label {}", table, label);
+                                LOG.info(
+                                        "table {} start execute load for label 
{}",
+                                        table,
+                                        finalLabel);
                                 return httpClient.execute(putBuilder.build());
                             });
         } catch (Exception e) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 2e5d29a5..2a79b736 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -31,4 +31,5 @@ public class LoadConstants {
     public static final String NULL_VALUE = "\\N";
     public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
     public static final String READ_JSON_BY_LINE = "read_json_by_line";
+    public static final String GROUP_COMMIT = "group_commit";
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
index e31e9179..c36805ba 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java
@@ -32,7 +32,9 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
+import org.junit.runners.MethodSorters;
 
 import java.util.Collection;
 import java.util.List;
@@ -41,6 +43,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class TestDorisCopyWriter {
 
     DorisOptions dorisOptions;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to