This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch pick-timeoutandretrystragy in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
commit 48b413321f9b0faea598be2e684a6c654bc40b5d Author: wudi <676366...@qq.com> AuthorDate: Mon Dec 23 14:04:46 2024 +0800 and timeout and retry straegy --- .../org/apache/doris/flink/rest/RestService.java | 12 +++-- .../apache/doris/flink/rest/models/BackendV2.java | 4 ++ .../flink/table/DorisDynamicOutputFormat.java | 45 ++++++++++++++--- .../apache/doris/flink/table/DorisStreamLoad.java | 58 ++++++++++++---------- 4 files changed, 81 insertions(+), 38 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index c229cf05..9d8f7b50 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -121,7 +121,7 @@ public class RestService implements Serializable { try { String response; if (request instanceof HttpGet) { - response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger); + response = getConnectionGet(request, options.getUsername(), options.getPassword(), logger); } else { response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger); } @@ -162,6 +162,8 @@ public class RestService implements Serializable { String res = IOUtils.toString(content); conn.setDoOutput(true); conn.setDoInput(true); + conn.setConnectTimeout(request.getConfig().getConnectTimeout()); + conn.setReadTimeout(request.getConfig().getSocketTimeout()); PrintWriter out = new PrintWriter(conn.getOutputStream()); // send request params out.print(res); @@ -171,13 +173,15 @@ public class RestService implements Serializable { return parseResponse(conn, logger); } - private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException { - URL realUrl = new URL(request); + private static String getConnectionGet(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException { + URL realUrl = new URL(request.getURI().toString()); // open connection HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection(); String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + authEncoding); + connection.setConnectTimeout(request.getConfig().getConnectTimeout()); + connection.setReadTimeout(request.getConfig().getSocketTimeout()); connection.connect(); return parseResponse(connection, logger); } @@ -346,7 +350,7 @@ public class RestService implements Serializable { * @throws IllegalArgumentException BE nodes is illegal */ @VisibleForTesting - static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { + public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { String feNodes = options.getFenodes(); List<String> feNodeList = allEndpoints(feNodes, logger); for (String feNode: feNodeList) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java index 5efb85ec..a6b5cf3f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java @@ -70,5 +70,9 @@ public class BackendV2 { public void setAlive(boolean alive) { isAlive = alive; } + + public String toBackendString() { + return ip + ":" + httpPort; + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 065eb185..cb0a3d21 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -23,14 +23,16 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.rest.models.Schema; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; -import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.CollectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +89,8 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { private DorisExecutionOptions executionOptions; private DorisStreamLoad dorisStreamLoad; private String keysType; - + private List<BackendV2.BackendRowV2> backends; + private long pos = 0L; private transient volatile boolean closed = false; private transient ScheduledExecutorService scheduler; private transient ScheduledFuture<?> scheduledFuture; @@ -105,11 +108,13 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY)); this.keysType = parseKeysType(); + handleStreamloadProp(); this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; for (int i = 0; i < logicalTypes.length; i++) { fieldGetters[i] = createFieldGetter(logicalTypes[i], i); } + } /** @@ -186,13 +191,15 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { @Override public void open(int taskNumber, int numTasks) throws IOException { + this.backends = settingBackends(); dorisStreamLoad = new DorisStreamLoad( - getBackend(), + backends.get(0).toBackendString(), options.getTableIdentifier().split("\\.")[0], options.getTableIdentifier().split("\\.")[1], options.getUsername(), options.getPassword(), - executionOptions.getStreamLoadProp()); + executionOptions.getStreamLoadProp(), + readOptions); LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr()); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { @@ -326,9 +333,9 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { throw new IOException(e); } try { - dorisStreamLoad.setHostPort(getBackend()); + dorisStreamLoad.setHostPort(getAvailableBackend()); LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e); - Thread.sleep(1000 * i); + Thread.sleep(1000L * ( i + 1 )); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("unable to flush; interrupted while doing another attempt", e); @@ -342,11 +349,35 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { //get be url from fe return RestService.randomBackend(options, readOptions, LOG); } catch (IOException | DorisException e) { - LOG.error("get backends info fail"); + LOG.error("get backends info fail", e); throw new IOException(e); } } + private List<BackendV2.BackendRowV2> settingBackends(){ + try { + List<BackendV2.BackendRowV2> backendsV2 = RestService.getBackendsV2(options, readOptions, LOG); + if(CollectionUtil.isNullOrEmpty(backendsV2)){ + throw new RuntimeException("get no available backend."); + } + return backendsV2; + } catch (Exception e) { + LOG.error("get backends lists fail", e); + throw new RuntimeException(e); + } + } + + public String getAvailableBackend() { + long tmp = pos + backends.size(); + while (pos < tmp) { + BackendV2.BackendRowV2 backend = + backends.get((int) (pos % backends.size())); + pos++; + return backend.toBackendString(); + } + throw new RuntimeException("error cause no available backend."); + } + /** * Builder for {@link DorisDynamicOutputFormat}. */ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index 0d5ba7fa..1ef8e20b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -20,9 +20,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.ConfigurationOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; import org.apache.http.HttpHeaders; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; @@ -65,17 +68,9 @@ public class DorisStreamLoad implements Serializable { private String tbl; private String authEncoding; private Properties streamLoadProp; - private final HttpClientBuilder httpClientBuilder = HttpClients - .custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); - private CloseableHttpClient httpClient; + private final HttpClientBuilder httpClientBuilder; - public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) { + public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions) { this.hostPort = hostPort; this.db = db; this.tbl = tbl; @@ -84,7 +79,22 @@ public class DorisStreamLoad implements Serializable { this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); this.authEncoding = basicAuthHeader(user, passwd); this.streamLoadProp = streamLoadProp; - this.httpClient = httpClientBuilder.build(); + int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs(); + int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs(); + this.httpClientBuilder = HttpClients + .custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }) + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(connectTimeout) + .setConnectionRequestTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + .build()); } public String getLoadUrlStr() { @@ -134,18 +144,20 @@ public class DorisStreamLoad implements Serializable { StringEntity entity = new StringEntity(value, "UTF-8"); put.setEntity(entity); - try (CloseableHttpResponse response = httpClient.execute(put)) { - final int statusCode = response.getStatusLine().getStatusCode(); - final String reasonPhrase = response.getStatusLine().getReasonPhrase(); - String loadResult = ""; - if (response.getEntity() != null) { - loadResult = EntityUtils.toString(response.getEntity()); + try (CloseableHttpClient httpClient = httpClientBuilder.build()){ + try (CloseableHttpResponse response = httpClient.execute(put)) { + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + String loadResult = ""; + if (response.getEntity() != null) { + loadResult = EntityUtils.toString(response.getEntity()); + } + return new LoadResponse(statusCode, reasonPhrase, loadResult); } - return new LoadResponse(statusCode, reasonPhrase, loadResult); } } catch (Exception e) { String err = "failed to stream load data with label: " + label; - LOG.warn(err, e); + LOG.error(err, e); return new LoadResponse(-1, e.getMessage(), err); } } @@ -157,14 +169,6 @@ public class DorisStreamLoad implements Serializable { } public void close() throws IOException { - if (null != httpClient) { - try { - httpClient.close(); - } catch (IOException e) { - LOG.error("Closing httpClient failed.", e); - throw new RuntimeException("Closing httpClient failed.", e); - } - } } public static class LoadResponse { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org