This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 
by this push:
     new 31e10ed6 [Pick] Pick load banlance on each backend (#565)
31e10ed6 is described below

commit 31e10ed66adbc4e1395806ed2df368645501e3ec
Author: wudi <676366...@qq.com>
AuthorDate: Mon Mar 24 18:06:53 2025 +0800

    [Pick] Pick load banlance on each backend (#565)
---
 .../doris/flink/table/DorisDynamicOutputFormat.java      | 16 ++++++++++++----
 .../org/apache/doris/flink/table/DorisStreamLoad.java    |  3 ++-
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index cb0a3d21..03a26dbf 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -91,6 +91,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     private String keysType;
     private List<BackendV2.BackendRowV2> backends;
     private long pos = 0L;
+    private int subtaskId = 0;
     private transient volatile boolean closed = false;
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
@@ -191,16 +192,17 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
+        this.subtaskId = taskNumber;
         this.backends = settingBackends();
+        String backend = getAvailableBackend();
         dorisStreamLoad = new DorisStreamLoad(
-                backends.get(0).toBackendString(),
+                backend,
                 options.getTableIdentifier().split("\\.")[0],
                 options.getTableIdentifier().split("\\.")[1],
                 options.getUsername(),
                 options.getPassword(),
                 executionOptions.getStreamLoadProp(),
                 readOptions);
-        LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
 
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
             this.scheduler = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("doris-streamload-output" +
@@ -321,6 +323,10 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         } else {
             result = String.join(this.lineDelimiter, batch);
         }
+
+        // refresh backend
+        dorisStreamLoad.setHostPort(getAvailableBackend());
+
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
                 dorisStreamLoad.load(result);
@@ -334,7 +340,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                 }
                 try {
                     dorisStreamLoad.setHostPort(getAvailableBackend());
-                    LOG.warn("streamload error,switch be: {}", 
dorisStreamLoad.getLoadUrlStr(), e);
+                    LOG.warn("stream load error,switch be: {}", 
dorisStreamLoad.getLoadUrlStr(), e);
                     Thread.sleep(1000L * ( i + 1 ));
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
@@ -342,8 +348,10 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                 }
             }
         }
+
     }
 
+    @Deprecated
     private String getBackend() throws IOException {
         try {
             //get be url from fe
@@ -371,7 +379,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         long tmp = pos + backends.size();
         while (pos < tmp) {
             BackendV2.BackendRowV2 backend =
-                    backends.get((int) (pos % backends.size()));
+                    backends.get((int) ((pos + subtaskId) % backends.size()));
             pos++;
             return backend.toBackendString();
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index 1ef8e20b..72cf8ffc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -108,7 +108,7 @@ public class DorisStreamLoad implements Serializable {
 
     public void load(String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(value);
-        LOG.info("Streamload Response:{}", loadResponse);
+        LOG.info("stream load response:{}", loadResponse);
         if (loadResponse.status != 200) {
             throw new StreamLoadException("stream load error: " + 
loadResponse.respContent);
         } else {
@@ -133,6 +133,7 @@ public class DorisStreamLoad implements Serializable {
                     UUID.randomUUID().toString().replaceAll("-", ""));
         }
 
+        LOG.info("stream load started for {} on host {}", label, hostPort);
         try {
             HttpPut put = new HttpPut(loadUrlStr);
             put.setHeader(HttpHeaders.EXPECT, "100-continue");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to