This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new acf6e45  reduce streamload connection (#133)
acf6e45 is described below

commit acf6e4582d8e651eff6174d05aa56f7681434645
Author: wudi <676366...@qq.com>
AuthorDate: Wed Apr 26 13:51:38 2023 +0800

    reduce streamload connection (#133)
    
    Co-authored-by: wudi <>
---
 .../org/apache/doris/flink/sink/writer/DorisWriter.java     | 13 +++++++++----
 .../org/apache/doris/flink/sink/writer/LabelGenerator.java  |  4 +++-
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index ac8fdaf..8ba2050 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -123,7 +123,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         // get main work thread.
         executorThread = Thread.currentThread();
         this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1);
-        dorisStreamLoad.startLoad(currentLabel);
         // when uploading data in streaming mode, we need to regularly detect 
whether there are exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, 
intervalTime, TimeUnit.MILLISECONDS);
     }
@@ -131,6 +130,11 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     @Override
     public void write(IN in, Context context) throws IOException {
         checkLoadException();
+        if(!loading) {
+            //Start streamload only when there has data
+            dorisStreamLoad.startLoad(currentLabel);
+            loading = true;
+        }
         byte[] serialize = serializer.serialize(in);
         if(Objects.isNull(serialize)){
             return;
@@ -140,6 +144,10 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
 
     @Override
     public List<DorisCommittable> prepareCommit(boolean flush) throws 
IOException {
+        if(!loading){
+            //There is no data during the entire checkpoint period
+            return Collections.emptyList();
+        }
         // disable exception checker before stop load.
         loading = false;
         Preconditions.checkState(dorisStreamLoad != null);
@@ -152,7 +160,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
             return Collections.emptyList();
         }
         long txnId = respContent.getTxnId();
-
         return ImmutableList.of(new 
DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), 
txnId));
     }
 
@@ -162,8 +169,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         // dynamic refresh BE node
         this.dorisStreamLoad.setHostPort(getAvailableBackend());
         this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
-        this.dorisStreamLoad.startLoad(currentLabel);
-        this.loading = true;
         return Collections.singletonList(dorisWriterState);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 436d709..d31e777 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.apache.doris.flink.sink.writer;
 
+import java.util.UUID;
+
 /**
  * Generator label for stream load.
  */
@@ -29,6 +31,6 @@ public class LabelGenerator {
     }
 
     public String generateLabel(long chkId) {
-        return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + 
System.currentTimeMillis();
+        return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + 
UUID.randomUUID();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to