This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 6f1474e7f7bc57f49320dd4468f6d08aba612919
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Fri Nov 5 09:43:06 2021 +0800

    [HTTP][API] Add backends info API for spark/flink connector (#6984)
    
    Doris should provide a http api to return backends list for connectors to 
submit stream load,
    and without privilege checking, which can let common user to use it
---
 .../org/apache/doris/flink/rest/RestService.java   | 64 ++++++++++++++++++++--
 .../apache/doris/flink/rest/models/Backend.java    |  1 +
 .../apache/doris/flink/rest/models/BackendRow.java |  1 +
 .../rest/models/{Backend.java => BackendV2.java}   | 47 +++++++++++++---
 4 files changed, 102 insertions(+), 11 deletions(-)

diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java 
b/src/main/java/org/apache/doris/flink/rest/RestService.java
index 184afd3..1e6310c 100644
--- a/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -32,6 +32,7 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.ShouldNeverHappenException;
 import org.apache.doris.flink.rest.models.Backend;
 import org.apache.doris.flink.rest.models.BackendRow;
+import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.QueryPlan;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.rest.models.Tablet;
@@ -83,7 +84,9 @@ public class RestService implements Serializable {
     private static final String API_PREFIX = "/api";
     private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
+    @Deprecated
     private static final String BACKENDS = "/rest/v1/system?path=//backends";
+    private static final String BACKENDS_V2 = "/api/backends?is_aliva=true";
     private static final String FE_LOGIN = "/rest/v1/login";
 
     /**
@@ -250,25 +253,29 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     public static String randomBackend(DorisOptions options, DorisReadOptions 
readOptions, Logger logger) throws DorisException, IOException {
-        List<BackendRow> backends = getBackends(options, readOptions, logger);
+        List<BackendV2.BackendRowV2> backends = getBackendsV2(options, 
readOptions, logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
             throw new IllegalArgumentException("beNodes", 
String.valueOf(backends));
         }
         Collections.shuffle(backends);
-        BackendRow backend = backends.get(0);
-        return backend.getIP() + ":" + backend.getHttpPort();
+        BackendV2.BackendRowV2 backend = backends.get(0);
+        return backend.getIp() + ":" + backend.getHttpPort();
     }
 
     /**
-     * get  Doris BE nodes to request.
+     * get Doris BE nodes to request.
      *
      * @param options configuration of request
      * @param logger  slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
+     *
+     * This method is deprecated. Because it needs ADMIN_PRIV to get backends, 
which is not suitable for common users.
+     * Use getBackendsV2 instead
      */
+    @Deprecated
     @VisibleForTesting
     static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions 
readOptions, Logger logger) throws DorisException, IOException {
         String feNodes = options.getFenodes();
@@ -281,6 +288,7 @@ public class RestService implements Serializable {
         return backends;
     }
 
+    @Deprecated
     static List<BackendRow> parseBackend(String response, Logger logger) 
throws DorisException, IOException {
         ObjectMapper mapper = new ObjectMapper();
         Backend backend;
@@ -310,6 +318,54 @@ public class RestService implements Serializable {
     }
 
     /**
+     * get Doris BE nodes to request.
+     *
+     * @param options configuration of request
+     * @param logger  slf4j logger
+     * @return the chosen one Doris BE node
+     * @throws IllegalArgumentException BE nodes is illegal
+     */
+    @VisibleForTesting
+    static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, 
DorisReadOptions readOptions, Logger logger) throws DorisException, IOException 
{
+        String feNodes = options.getFenodes();
+        String feNode = randomEndpoint(feNodes, logger);
+        String beUrl = "http://"; + feNode + BACKENDS_V2;
+        HttpGet httpGet = new HttpGet(beUrl);
+        String response = send(options, readOptions, httpGet, logger);
+        logger.info("Backend Info:{}", response);
+        List<BackendV2.BackendRowV2> backends = parseBackendV2(response, 
logger);
+        return backends;
+    }
+
+    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger) throws DorisException, IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        BackendV2 backend;
+        try {
+            backend = mapper.readValue(response, BackendV2.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris BE's response is not a json. res: " + 
response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris BE's response cannot map to schema. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris BE's response to json failed. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        }
+
+        if (backend == null) {
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+        logger.debug("Parsing schema result is '{}'.", backendRows);
+        return backendRows;
+    }
+
+    /**
      * get a valid URI to connect Doris FE.
      *
      * @param options configuration of request
diff --git a/src/main/java/org/apache/doris/flink/rest/models/Backend.java 
b/src/main/java/org/apache/doris/flink/rest/models/Backend.java
index d74e46f..d91614f 100644
--- a/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/src/main/java/org/apache/doris/flink/rest/models/Backend.java
@@ -25,6 +25,7 @@ import java.util.List;
 /**
  * Be response model
  **/
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Backend {
 
diff --git a/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java 
b/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
index 5b7df99..3dd0471 100644
--- a/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
+++ b/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.rest.models;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BackendRow {
 
diff --git a/src/main/java/org/apache/doris/flink/rest/models/Backend.java 
b/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
similarity index 52%
copy from src/main/java/org/apache/doris/flink/rest/models/Backend.java
copy to src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index d74e46f..5efb85e 100644
--- a/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -26,16 +26,49 @@ import java.util.List;
  * Be response model
  **/
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
+public class BackendV2 {
 
-    @JsonProperty(value = "rows")
-    private List<BackendRow> rows;
+    @JsonProperty(value = "backends")
+    private List<BackendRowV2> backends;
 
-    public List<BackendRow> getRows() {
-        return rows;
+    public List<BackendRowV2> getBackends() {
+        return backends;
     }
 
-    public void setRows(List<BackendRow> rows) {
-        this.rows = rows;
+    public void setBackends(List<BackendRowV2> backends) {
+        this.backends = backends;
+    }
+
+    public static class BackendRowV2 {
+        @JsonProperty("ip")
+        public String ip;
+        @JsonProperty("http_port")
+        public int httpPort;
+        @JsonProperty("is_alive")
+        public boolean isAlive;
+
+        public String getIp() {
+            return ip;
+        }
+
+        public void setIp(String ip) {
+            this.ip = ip;
+        }
+
+        public int getHttpPort() {
+            return httpPort;
+        }
+
+        public void setHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+        }
+
+        public boolean isAlive() {
+            return isAlive;
+        }
+
+        public void setAlive(boolean alive) {
+            isAlive = alive;
+        }
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to