This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch branch-for-flink-before-1.13 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push: new 31e10ed6 [Pick] Pick load banlance on each backend (#565) 31e10ed6 is described below commit 31e10ed66adbc4e1395806ed2df368645501e3ec Author: wudi <676366...@qq.com> AuthorDate: Mon Mar 24 18:06:53 2025 +0800 [Pick] Pick load banlance on each backend (#565) --- .../doris/flink/table/DorisDynamicOutputFormat.java | 16 ++++++++++++---- .../org/apache/doris/flink/table/DorisStreamLoad.java | 3 ++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index cb0a3d21..03a26dbf 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -91,6 +91,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { private String keysType; private List<BackendV2.BackendRowV2> backends; private long pos = 0L; + private int subtaskId = 0; private transient volatile boolean closed = false; private transient ScheduledExecutorService scheduler; private transient ScheduledFuture<?> scheduledFuture; @@ -191,16 +192,17 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { @Override public void open(int taskNumber, int numTasks) throws IOException { + this.subtaskId = taskNumber; this.backends = settingBackends(); + String backend = getAvailableBackend(); dorisStreamLoad = new DorisStreamLoad( - backends.get(0).toBackendString(), + backend, options.getTableIdentifier().split("\\.")[0], options.getTableIdentifier().split("\\.")[1], options.getUsername(), options.getPassword(), executionOptions.getStreamLoadProp(), readOptions); - LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr()); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output" + @@ -321,6 +323,10 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } else { result = String.join(this.lineDelimiter, batch); } + + // refresh backend + dorisStreamLoad.setHostPort(getAvailableBackend()); + for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { dorisStreamLoad.load(result); @@ -334,7 +340,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } try { dorisStreamLoad.setHostPort(getAvailableBackend()); - LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e); + LOG.warn("stream load error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e); Thread.sleep(1000L * ( i + 1 )); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -342,8 +348,10 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } } } + } + @Deprecated private String getBackend() throws IOException { try { //get be url from fe @@ -371,7 +379,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { long tmp = pos + backends.size(); while (pos < tmp) { BackendV2.BackendRowV2 backend = - backends.get((int) (pos % backends.size())); + backends.get((int) ((pos + subtaskId) % backends.size())); pos++; return backend.toBackendString(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index 1ef8e20b..72cf8ffc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -108,7 +108,7 @@ public class DorisStreamLoad implements Serializable { public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); - LOG.info("Streamload Response:{}", loadResponse); + LOG.info("stream load response:{}", loadResponse); if (loadResponse.status != 200) { throw new StreamLoadException("stream load error: " + loadResponse.respContent); } else { @@ -133,6 +133,7 @@ public class DorisStreamLoad implements Serializable { UUID.randomUUID().toString().replaceAll("-", "")); } + LOG.info("stream load started for {} on host {}", label, hostPort); try { HttpPut put = new HttpPut(loadUrlStr); put.setHeader(HttpHeaders.EXPECT, "100-continue"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org