This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch pick-timeoutandretrystragy
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git

commit 48b413321f9b0faea598be2e684a6c654bc40b5d
Author: wudi <676366...@qq.com>
AuthorDate: Mon Dec 23 14:04:46 2024 +0800

    and timeout and retry straegy
---
 .../org/apache/doris/flink/rest/RestService.java   | 12 +++--
 .../apache/doris/flink/rest/models/BackendV2.java  |  4 ++
 .../flink/table/DorisDynamicOutputFormat.java      | 45 ++++++++++++++---
 .../apache/doris/flink/table/DorisStreamLoad.java  | 58 ++++++++++++----------
 4 files changed, 81 insertions(+), 38 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index c229cf05..9d8f7b50 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -121,7 +121,7 @@ public class RestService implements Serializable {
             try {
                 String response;
                 if (request instanceof HttpGet) {
-                    response = getConnectionGet(request.getURI().toString(), 
options.getUsername(), options.getPassword(), logger);
+                    response = getConnectionGet(request, 
options.getUsername(), options.getPassword(), logger);
                 } else {
                     response = getConnectionPost(request, 
options.getUsername(), options.getPassword(), logger);
                 }
@@ -162,6 +162,8 @@ public class RestService implements Serializable {
         String res = IOUtils.toString(content);
         conn.setDoOutput(true);
         conn.setDoInput(true);
+        conn.setConnectTimeout(request.getConfig().getConnectTimeout());
+        conn.setReadTimeout(request.getConfig().getSocketTimeout());
         PrintWriter out = new PrintWriter(conn.getOutputStream());
         // send request params
         out.print(res);
@@ -171,13 +173,15 @@ public class RestService implements Serializable {
         return parseResponse(conn, logger);
     }
 
-    private static String getConnectionGet(String request, String user, String 
passwd, Logger logger) throws IOException {
-        URL realUrl = new URL(request);
+    private static String getConnectionGet(HttpRequestBase request, String 
user, String passwd, Logger logger) throws IOException {
+        URL realUrl = new URL(request.getURI().toString());
         // open connection
         HttpURLConnection connection = (HttpURLConnection) 
realUrl.openConnection();
         String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
         connection.setRequestProperty("Authorization", "Basic " + 
authEncoding);
 
+        connection.setConnectTimeout(request.getConfig().getConnectTimeout());
+        connection.setReadTimeout(request.getConfig().getSocketTimeout());
         connection.connect();
         return parseResponse(connection, logger);
     }
@@ -346,7 +350,7 @@ public class RestService implements Serializable {
      * @throws IllegalArgumentException BE nodes is illegal
      */
     @VisibleForTesting
-    static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, 
DorisReadOptions readOptions, Logger logger) throws DorisException, IOException 
{
+    public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions 
options, DorisReadOptions readOptions, Logger logger) throws DorisException, 
IOException {
         String feNodes = options.getFenodes();
         List<String> feNodeList = allEndpoints(feNodes, logger);
         for (String feNode: feNodeList) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index 5efb85ec..a6b5cf3f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -70,5 +70,9 @@ public class BackendV2 {
         public void setAlive(boolean alive) {
             isAlive = alive;
         }
+
+        public String toBackendString() {
+            return ip + ":" + httpPort;
+        }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 065eb185..cb0a3d21 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -23,14 +23,16 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,7 +89,8 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     private DorisExecutionOptions executionOptions;
     private DorisStreamLoad dorisStreamLoad;
     private String keysType;
-
+    private List<BackendV2.BackendRowV2> backends;
+    private long pos = 0L;
     private transient volatile boolean closed = false;
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
@@ -105,11 +108,13 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         this.jsonFormat = 
FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
         this.keysType = parseKeysType();
 
+
         handleStreamloadProp();
         this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
         for (int i = 0; i < logicalTypes.length; i++) {
             fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
         }
+
     }
 
     /**
@@ -186,13 +191,15 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
+        this.backends = settingBackends();
         dorisStreamLoad = new DorisStreamLoad(
-                getBackend(),
+                backends.get(0).toBackendString(),
                 options.getTableIdentifier().split("\\.")[0],
                 options.getTableIdentifier().split("\\.")[1],
                 options.getUsername(),
                 options.getPassword(),
-                executionOptions.getStreamLoadProp());
+                executionOptions.getStreamLoadProp(),
+                readOptions);
         LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
 
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
@@ -326,9 +333,9 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                     throw new IOException(e);
                 }
                 try {
-                    dorisStreamLoad.setHostPort(getBackend());
+                    dorisStreamLoad.setHostPort(getAvailableBackend());
                     LOG.warn("streamload error,switch be: {}", 
dorisStreamLoad.getLoadUrlStr(), e);
-                    Thread.sleep(1000 * i);
+                    Thread.sleep(1000L * ( i + 1 ));
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
                     throw new IOException("unable to flush; interrupted while 
doing another attempt", e);
@@ -342,11 +349,35 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
             //get be url from fe
             return RestService.randomBackend(options, readOptions, LOG);
         } catch (IOException | DorisException e) {
-            LOG.error("get backends info fail");
+            LOG.error("get backends info fail", e);
             throw new IOException(e);
         }
     }
 
+    private List<BackendV2.BackendRowV2> settingBackends(){
+        try {
+            List<BackendV2.BackendRowV2> backendsV2 = 
RestService.getBackendsV2(options, readOptions, LOG);
+            if(CollectionUtil.isNullOrEmpty(backendsV2)){
+                throw new RuntimeException("get no available backend.");
+            }
+            return backendsV2;
+        } catch (Exception e) {
+            LOG.error("get backends lists fail", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getAvailableBackend() {
+        long tmp = pos + backends.size();
+        while (pos < tmp) {
+            BackendV2.BackendRowV2 backend =
+                    backends.get((int) (pos % backends.size()));
+            pos++;
+            return backend.toBackendString();
+        }
+        throw new RuntimeException("error cause no available backend.");
+    }
+
     /**
      * Builder for {@link DorisDynamicOutputFormat}.
      */
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index 0d5ba7fa..1ef8e20b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -20,9 +20,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.cfg.ConfigurationOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.StringEntity;
@@ -65,17 +68,9 @@ public class DorisStreamLoad implements Serializable {
     private String tbl;
     private String authEncoding;
     private Properties streamLoadProp;
-    private final HttpClientBuilder httpClientBuilder = HttpClients
-            .custom()
-            .setRedirectStrategy(new DefaultRedirectStrategy() {
-                @Override
-                protected boolean isRedirectable(String method) {
-                    return true;
-                }
-            });
-    private CloseableHttpClient httpClient;
+    private final HttpClientBuilder httpClientBuilder;
 
-    public DorisStreamLoad(String hostPort, String db, String tbl, String 
user, String passwd, Properties streamLoadProp) {
+    public DorisStreamLoad(String hostPort, String db, String tbl, String 
user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions) {
         this.hostPort = hostPort;
         this.db = db;
         this.tbl = tbl;
@@ -84,7 +79,22 @@ public class DorisStreamLoad implements Serializable {
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = basicAuthHeader(user, passwd);
         this.streamLoadProp = streamLoadProp;
-        this.httpClient = httpClientBuilder.build();
+        int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null 
? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : 
readOptions.getRequestConnectTimeoutMs();
+        int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? 
ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : 
readOptions.getRequestReadTimeoutMs();
+        this.httpClientBuilder = HttpClients
+                .custom()
+                .setRedirectStrategy(new DefaultRedirectStrategy() {
+                    @Override
+                    protected boolean isRedirectable(String method) {
+                        return true;
+                    }
+                })
+                .setDefaultRequestConfig(
+                        RequestConfig.custom()
+                                .setConnectTimeout(connectTimeout)
+                                .setConnectionRequestTimeout(connectTimeout)
+                                .setSocketTimeout(socketTimeout)
+                                .build());
     }
 
     public String getLoadUrlStr() {
@@ -134,18 +144,20 @@ public class DorisStreamLoad implements Serializable {
             StringEntity entity = new StringEntity(value, "UTF-8");
             put.setEntity(entity);
 
-            try (CloseableHttpResponse response = httpClient.execute(put)) {
-                final int statusCode = 
response.getStatusLine().getStatusCode();
-                final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
-                String loadResult = "";
-                if (response.getEntity() != null) {
-                    loadResult = EntityUtils.toString(response.getEntity());
+            try (CloseableHttpClient httpClient = httpClientBuilder.build()){
+                try (CloseableHttpResponse response = httpClient.execute(put)) 
{
+                    final int statusCode = 
response.getStatusLine().getStatusCode();
+                    final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
+                    String loadResult = "";
+                    if (response.getEntity() != null) {
+                        loadResult = 
EntityUtils.toString(response.getEntity());
+                    }
+                    return new LoadResponse(statusCode, reasonPhrase, 
loadResult);
                 }
-                return new LoadResponse(statusCode, reasonPhrase, loadResult);
             }
         } catch (Exception e) {
             String err = "failed to stream load data with label: " + label;
-            LOG.warn(err, e);
+            LOG.error(err, e);
             return new LoadResponse(-1, e.getMessage(), err);
         }
     }
@@ -157,14 +169,6 @@ public class DorisStreamLoad implements Serializable {
     }
 
     public void close() throws IOException {
-        if (null != httpClient) {
-            try {
-                httpClient.close();
-            } catch (IOException e) {
-                LOG.error("Closing httpClient failed.", e);
-                throw new RuntimeException("Closing httpClient failed.", e);
-            }
-        }
     }
 
     public static class LoadResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to