This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ecb23b  [Fix] Fixed the problem that retry may get stuck when fe 
hangs up (#331)
6ecb23b is described below

commit 6ecb23b69a2e39d9a8fc0556b0716bdc6fec7851
Author: wudi <[email protected]>
AuthorDate: Tue Jul 22 17:57:37 2025 +0800

    [Fix] Fixed the problem that retry may get stuck when fe hangs up (#331)
---
 .../doris/spark/client/DorisBackendHttpClient.java | 10 +++-
 .../doris/spark/client/DorisFrontendClient.java    |  8 +++-
 .../apache/doris/spark/client/entity/Backend.java  |  4 ++
 .../client/write/AbstractStreamLoadProcessor.java  |  3 +-
 .../doris/spark/client/write/DorisWriter.java      |  7 ++-
 .../java/org/apache/doris/spark/util/HttpUtil.java | 56 ++++++++++++++++++++++
 .../apache/doris/spark/write/DorisDataWriter.scala |  4 +-
 7 files changed, 84 insertions(+), 8 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
index 54b5659..1f28ae5 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
@@ -18,6 +18,7 @@
 package org.apache.doris.spark.client;
 
 import org.apache.doris.spark.client.entity.Backend;
+import org.apache.doris.spark.util.HttpUtil;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.slf4j.Logger;
@@ -46,15 +47,22 @@ public class DorisBackendHttpClient implements Serializable 
{
         Exception ex = null;
         for (Backend backend : backends) {
             try {
-                return reqFunc.apply(backend, httpClient);
+                if(HttpUtil.tryHttpConnection(backend.hostHttpPortString())){
+                    return reqFunc.apply(backend, httpClient);
+                }
             } catch (Exception e) {
                 log.warn("Failed to execute request on backend: {}:{}", 
backend.getHost(), backend.getHttpPort(), e);
                 ex = e;
             }
         }
+
+        if (ex == null) {
+            ex = new Exception("All backends failed to execute request.");
+        }
         throw ex;
     }
 
+
     public void close() {
         if (httpClient != null) {
             try {
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index cefb063..b0d1dc1 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -26,6 +26,7 @@ import 
org.apache.doris.spark.exception.OptionRequiredException;
 import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.HttpUtil;
 import org.apache.doris.spark.util.HttpUtils;
 import org.apache.doris.spark.util.URLs;
 
@@ -157,12 +158,17 @@ public class DorisFrontendClient implements Serializable {
         Exception ex = null;
         for (Frontend frontEnd : frontEnds) {
             try {
-                return reqFunc.apply(frontEnd, httpClient);
+                if(HttpUtil.tryHttpConnection(frontEnd.hostHttpPortString())){
+                    return reqFunc.apply(frontEnd, httpClient);
+                }
             } catch (Exception e) {
                 LOG.warn("fe http request on {} failed, err: {}", 
frontEnd.hostHttpPortString(), e.getMessage());
                 ex = e;
             }
         }
+        if (ex == null) {
+            ex = new Exception("All frontends failed to execute request.");
+        }
         throw ex;
     }
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
index 36b8e73..5a4ce6c 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
@@ -77,4 +77,8 @@ public class Backend implements Serializable {
         return String.format("%s:%d", host, rpcPort);
     }
 
+    public String hostHttpPortString() {
+        return host + ":" + httpPort;
+    }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index f64d201..c9a7bb9 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -90,8 +90,6 @@ public abstract class AbstractStreamLoadProcessor<R> extends 
DorisWriter<R> impl
     private byte[] lineDelimiter;
     private String groupCommit;
     private PipedOutputStream output;
-    private boolean createNewBatch = true;
-    private boolean isFirstRecordOfBatch = true;
     private transient ExecutorService executor;
 
     private Future<StreamLoadResponse> requestFuture = null;
@@ -426,6 +424,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 logger.error("stream load exception", e);
                 unexpectedException = e;
                 currentThread.interrupt();
+                throw e;
             }
             return streamLoadResponse;
         });
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
index ada89c7..7ba7f9b 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
@@ -25,8 +25,9 @@ import java.io.Serializable;
 public abstract class DorisWriter<R> implements Serializable {
 
     protected int batchSize;
-
     protected int currentBatchCount = 0;
+    protected boolean createNewBatch = true;
+    protected boolean isFirstRecordOfBatch = true;
 
     public DorisWriter(int batchSize) {
         if (batchSize <= 0) {
@@ -50,7 +51,9 @@ public abstract class DorisWriter<R> implements Serializable {
     }
 
     public void resetBatchCount() {
-        currentBatchCount = 0;
+        this.currentBatchCount = 0;
+        this.createNewBatch = true;
+        this.isFirstRecordOfBatch = true;
     }
 
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/HttpUtil.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/HttpUtil.java
new file mode 100644
index 0000000..fe3228c
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/HttpUtil.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class HttpUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(HttpUtil.class);
+
+    public static boolean tryHttpConnection(String host) {
+        try {
+            LOG.debug("try to connect host {}", host);
+            host = "http://"; + host;
+            URL url = new URL(host);
+            HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+            connection.setRequestMethod("GET");
+            connection.setConnectTimeout(60000);
+            connection.setReadTimeout(60000);
+            int responseCode = connection.getResponseCode();
+            String responseMessage = connection.getResponseMessage();
+            connection.disconnect();
+            if (responseCode < 500) {
+                // code greater than 500 means a server-side exception.
+                return true;
+            }
+            LOG.warn(
+                    "Failed to connect host {}, responseCode={}, msg={}",
+                    host,
+                    responseCode,
+                    responseMessage);
+            return false;
+        } catch (Exception ex) {
+            LOG.warn("Failed to connect to host:{}, cause {}", host, 
ex.getMessage());
+            return false;
+        }
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index bc92a93..0020c3d 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -84,10 +84,10 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
     Retry.exec[Unit, Exception](retries, 
Duration.ofMillis(retryIntervalMs.toLong), log) {
       if (isRetrying) {
         // retrying, reload data from buffer
-        do {
+        while (writer.getBatchCount < recordBuffer.size){
           val idx = writer.getBatchCount
           writer.load(recordBuffer(idx))
-        } while (writer.getBatchCount < recordBuffer.size)
+        }
         isRetrying = false
       }
       if (writer.endOfBatch()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to