This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fcc640e [Fix] When http reports an error, writing will get stuck 
(#539)
1fcc640e is described below

commit 1fcc640ea589094baa7536abcef26fbb6edb4752
Author: wudi <676366...@qq.com>
AuthorDate: Fri Jan 10 12:11:07 2025 +0800

    [Fix] When http reports an error, writing will get stuck (#539)
---
 .../java/org/apache/doris/flink/sink/HttpUtil.java |  1 +
 .../doris/flink/sink/writer/DorisStreamLoad.java   | 26 ++++++++++++++++++++--
 .../doris/flink/sink/writer/LabelGenerator.java    |  8 +++++++
 3 files changed, 33 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 53d3ce13..d1600de6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -60,6 +60,7 @@ public class HttpUtil {
                                         return true;
                                     }
                                 })
+                        .setRetryHandler((exception, executionCount, context) 
-> false)
                         
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
                         .setDefaultRequestConfig(
                                 RequestConfig.custom()
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index f900f741..c6e39326 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -89,6 +89,7 @@ public class DorisStreamLoad implements Serializable {
     private final Properties streamLoadProp;
     private final RecordStream recordStream;
     private volatile Future<CloseableHttpResponse> pendingLoadFuture;
+    private volatile Exception httpException = null;
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
     private boolean loadBatchFirstRecord;
@@ -115,6 +116,10 @@ public class DorisStreamLoad implements Serializable {
         this.streamLoadProp = executionOptions.getStreamLoadProp();
         this.enableDelete = executionOptions.getDeletable();
         this.httpClient = httpClient;
+        String threadName =
+                String.format(
+                        "stream-load-upload-%s-%s",
+                        labelGenerator.getSubtaskId(), 
labelGenerator.getTableIdentifier());
         this.executorService =
                 new ThreadPoolExecutor(
                         1,
@@ -122,7 +127,7 @@ public class DorisStreamLoad implements Serializable {
                         0L,
                         TimeUnit.MILLISECONDS,
                         new LinkedBlockingQueue<>(),
-                        new ExecutorThreadFactory("stream-load-upload"));
+                        new ExecutorThreadFactory(threadName));
         this.recordStream =
                 new RecordStream(
                         executionOptions.getBufferSize(),
@@ -250,6 +255,7 @@ public class DorisStreamLoad implements Serializable {
      * @throws IOException
      */
     public void writeRecord(byte[] record) throws IOException {
+        checkLoadException();
         if (loadBatchFirstRecord) {
             loadBatchFirstRecord = false;
         } else if (lineDelimiter != null) {
@@ -258,6 +264,12 @@ public class DorisStreamLoad implements Serializable {
         recordStream.write(record);
     }
 
+    private void checkLoadException() {
+        if (httpException != null) {
+            throw new RuntimeException("Stream load http request error, ", 
httpException);
+        }
+    }
+
     @VisibleForTesting
     public RecordStream getRecordStream() {
         return recordStream;
@@ -347,11 +359,21 @@ public class DorisStreamLoad implements Serializable {
             } else {
                 executeMessage = "table " + table + " start execute load for 
label " + label;
             }
+            Thread mainThread = Thread.currentThread();
             pendingLoadFuture =
                     executorService.submit(
                             () -> {
                                 LOG.info(executeMessage);
-                                return httpClient.execute(putBuilder.build());
+                                try {
+                                    return 
httpClient.execute(putBuilder.build());
+                                } catch (Exception e) {
+                                    LOG.error("Failed to execute load, cause 
", e);
+                                    httpException = e;
+                                    // When an HTTP error occurs, the main 
thread should be
+                                    // interrupted to prevent blocking
+                                    mainThread.interrupt();
+                                    throw e;
+                                }
                             });
         } catch (Exception e) {
             String err;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index d80315f5..84c14b79 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -91,4 +91,12 @@ public class LabelGenerator {
         String concatPrefix = String.format("%s_%s_%s", labelPrefix, 
tableIdentifier, subtaskId);
         return concatPrefix;
     }
+
+    public int getSubtaskId() {
+        return subtaskId;
+    }
+
+    public String getTableIdentifier() {
+        return tableIdentifier;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to