This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new acf6e45 reduce streamload connection (#133) acf6e45 is described below commit acf6e4582d8e651eff6174d05aa56f7681434645 Author: wudi <676366...@qq.com> AuthorDate: Wed Apr 26 13:51:38 2023 +0800 reduce streamload connection (#133) Co-authored-by: wudi <> --- .../org/apache/doris/flink/sink/writer/DorisWriter.java | 13 +++++++++---- .../org/apache/doris/flink/sink/writer/LabelGenerator.java | 4 +++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index ac8fdaf..8ba2050 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -123,7 +123,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr // get main work thread. executorThread = Thread.currentThread(); this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1); - dorisStreamLoad.startLoad(currentLabel); // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } @@ -131,6 +130,11 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr @Override public void write(IN in, Context context) throws IOException { checkLoadException(); + if(!loading) { + //Start streamload only when there has data + dorisStreamLoad.startLoad(currentLabel); + loading = true; + } byte[] serialize = serializer.serialize(in); if(Objects.isNull(serialize)){ return; @@ -140,6 +144,10 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr @Override public List<DorisCommittable> prepareCommit(boolean flush) throws IOException { + if(!loading){ + //There is no data during the entire checkpoint period + return Collections.emptyList(); + } // disable exception checker before stop load. loading = false; Preconditions.checkState(dorisStreamLoad != null); @@ -152,7 +160,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr return Collections.emptyList(); } long txnId = respContent.getTxnId(); - return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); } @@ -162,8 +169,6 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr // dynamic refresh BE node this.dorisStreamLoad.setHostPort(getAvailableBackend()); this.currentLabel = labelGenerator.generateLabel(checkpointId + 1); - this.dorisStreamLoad.startLoad(currentLabel); - this.loading = true; return Collections.singletonList(dorisWriterState); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index 436d709..d31e777 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -16,6 +16,8 @@ // under the License. package org.apache.doris.flink.sink.writer; +import java.util.UUID; + /** * Generator label for stream load. */ @@ -29,6 +31,6 @@ public class LabelGenerator { } public String generateLabel(long chkId) { - return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + System.currentTimeMillis(); + return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + UUID.randomUUID(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org