This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 6f1474e7f7bc57f49320dd4468f6d08aba612919 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri Nov 5 09:43:06 2021 +0800 [HTTP][API] Add backends info API for spark/flink connector (#6984) Doris should provide a http api to return backends list for connectors to submit stream load, and without privilege checking, which can let common user to use it --- .../org/apache/doris/flink/rest/RestService.java | 64 ++++++++++++++++++++-- .../apache/doris/flink/rest/models/Backend.java | 1 + .../apache/doris/flink/rest/models/BackendRow.java | 1 + .../rest/models/{Backend.java => BackendV2.java} | 47 +++++++++++++--- 4 files changed, 102 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java b/src/main/java/org/apache/doris/flink/rest/RestService.java index 184afd3..1e6310c 100644 --- a/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -32,6 +32,7 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.ShouldNeverHappenException; import org.apache.doris.flink.rest.models.Backend; import org.apache.doris.flink.rest.models.BackendRow; +import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.rest.models.QueryPlan; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.rest.models.Tablet; @@ -83,7 +84,9 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; + @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends"; + private static final String BACKENDS_V2 = "/api/backends?is_aliva=true"; private static final String FE_LOGIN = "/rest/v1/login"; /** @@ -250,25 +253,29 @@ public class RestService implements Serializable { */ @VisibleForTesting public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { - List<BackendRow> backends = getBackends(options, readOptions, logger); + List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger); logger.trace("Parse beNodes '{}'.", backends); if (backends == null || backends.isEmpty()) { logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); throw new IllegalArgumentException("beNodes", String.valueOf(backends)); } Collections.shuffle(backends); - BackendRow backend = backends.get(0); - return backend.getIP() + ":" + backend.getHttpPort(); + BackendV2.BackendRowV2 backend = backends.get(0); + return backend.getIp() + ":" + backend.getHttpPort(); } /** - * get Doris BE nodes to request. + * get Doris BE nodes to request. * * @param options configuration of request * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal + * + * This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users. + * Use getBackendsV2 instead */ + @Deprecated @VisibleForTesting static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { String feNodes = options.getFenodes(); @@ -281,6 +288,7 @@ public class RestService implements Serializable { return backends; } + @Deprecated static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException { ObjectMapper mapper = new ObjectMapper(); Backend backend; @@ -310,6 +318,54 @@ public class RestService implements Serializable { } /** + * get Doris BE nodes to request. + * + * @param options configuration of request + * @param logger slf4j logger + * @return the chosen one Doris BE node + * @throws IllegalArgumentException BE nodes is illegal + */ + @VisibleForTesting + static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { + String feNodes = options.getFenodes(); + String feNode = randomEndpoint(feNodes, logger); + String beUrl = "http://" + feNode + BACKENDS_V2; + HttpGet httpGet = new HttpGet(beUrl); + String response = send(options, readOptions, httpGet, logger); + logger.info("Backend Info:{}", response); + List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); + return backends; + } + + static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException { + ObjectMapper mapper = new ObjectMapper(); + BackendV2 backend; + try { + backend = mapper.readValue(response, BackendV2.class); + } catch (JsonParseException e) { + String errMsg = "Doris BE's response is not a json. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (JsonMappingException e) { + String errMsg = "Doris BE's response cannot map to schema. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (IOException e) { + String errMsg = "Parse Doris BE's response to json failed. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } + + if (backend == null) { + logger.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + List<BackendV2.BackendRowV2> backendRows = backend.getBackends(); + logger.debug("Parsing schema result is '{}'.", backendRows); + return backendRows; + } + + /** * get a valid URI to connect Doris FE. * * @param options configuration of request diff --git a/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/src/main/java/org/apache/doris/flink/rest/models/Backend.java index d74e46f..d91614f 100644 --- a/src/main/java/org/apache/doris/flink/rest/models/Backend.java +++ b/src/main/java/org/apache/doris/flink/rest/models/Backend.java @@ -25,6 +25,7 @@ import java.util.List; /** * Be response model **/ +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class Backend { diff --git a/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java b/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java index 5b7df99..3dd0471 100644 --- a/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java +++ b/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.rest.models; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class BackendRow { diff --git a/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java similarity index 52% copy from src/main/java/org/apache/doris/flink/rest/models/Backend.java copy to src/main/java/org/apache/doris/flink/rest/models/BackendV2.java index d74e46f..5efb85e 100644 --- a/src/main/java/org/apache/doris/flink/rest/models/Backend.java +++ b/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java @@ -26,16 +26,49 @@ import java.util.List; * Be response model **/ @JsonIgnoreProperties(ignoreUnknown = true) -public class Backend { +public class BackendV2 { - @JsonProperty(value = "rows") - private List<BackendRow> rows; + @JsonProperty(value = "backends") + private List<BackendRowV2> backends; - public List<BackendRow> getRows() { - return rows; + public List<BackendRowV2> getBackends() { + return backends; } - public void setRows(List<BackendRow> rows) { - this.rows = rows; + public void setBackends(List<BackendRowV2> backends) { + this.backends = backends; + } + + public static class BackendRowV2 { + @JsonProperty("ip") + public String ip; + @JsonProperty("http_port") + public int httpPort; + @JsonProperty("is_alive") + public boolean isAlive; + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public boolean isAlive() { + return isAlive; + } + + public void setAlive(boolean alive) { + isAlive = alive; + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org