This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new fadbada  Supports traversal of Doris FE nodes when searching for Doris 
BE (#67)
fadbada is described below

commit fadbada36f4088addd80a688191ce37693cda093
Author: MrZHui888 <18601927...@163.com>
AuthorDate: Mon Feb 6 18:34:19 2023 +0800

    Supports traversal of Doris FE nodes when searching for Doris BE (#67)
    
    Co-authored-by: 顾忠辉 <guzhong...@yiche.com>
---
 .../org/apache/doris/spark/rest/RestService.java   | 130 +++++++++++++++------
 1 file changed, 95 insertions(+), 35 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index b614e1a..a00385c 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -263,6 +263,17 @@ public class RestService implements Serializable {
                 "/";
     }
 
+    @VisibleForTesting
+    static String getUriStr(String feNode,Settings cfg, Logger logger) throws 
IllegalArgumentException {
+        String[] identifier = 
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
+        return "http://"; +
+                feNode + API_PREFIX +
+                "/" + identifier[0] +
+                "/" + identifier[1] +
+                "/";
+    }
+
+
     /**
      * discover Doris table schema from Doris FE.
      * @param cfg configuration of request
@@ -273,10 +284,20 @@ public class RestService implements Serializable {
     public static Schema getSchema(Settings cfg, Logger logger)
             throws DorisException {
         logger.trace("Finding schema.");
-        HttpGet httpGet = new HttpGet(getUriStr(cfg, logger) + SCHEMA);
-        String response = send(cfg, httpGet, logger);
-        logger.debug("Find schema response is '{}'.", response);
-        return parseSchema(response, logger);
+        List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), 
logger);
+        for (String feNode: feNodeList) {
+            try {
+                HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + 
SCHEMA);
+                String response = send(cfg, httpGet, logger);
+                logger.debug("Find schema response is '{}'.", response);
+                return parseSchema(response, logger);
+            } catch (ConnectedFailedException e) {
+                logger.info("Doris FE node {} is unavailable: {}, Request the 
next Doris FE node", feNode, e.getMessage());
+            }
+        }
+        String errMsg = "No Doris FE is available, please check configuration";
+        logger.error(errMsg);
+        throw new DorisException(errMsg);
     }
 
     /**
@@ -337,25 +358,36 @@ public class RestService implements Serializable {
         }
         logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
 
-        HttpPost httpPost = new HttpPost(getUriStr(cfg, logger) + QUERY_PLAN);
-        String entity = "{\"sql\": \""+ sql +"\"}";
-        logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
-        StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
-        stringEntity.setContentEncoding("UTF-8");
-        stringEntity.setContentType("application/json");
-        httpPost.setEntity(stringEntity);
-
-        String resStr = send(cfg, httpPost, logger);
-        logger.debug("Find partition response is '{}'.", resStr);
-        QueryPlan queryPlan = getQueryPlan(resStr, logger);
-        Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, 
logger);
-        return tabletsMapToPartition(
-                cfg,
-                be2Tablets,
-                queryPlan.getOpaqued_query_plan(),
-                tableIdentifiers[0],
-                tableIdentifiers[1],
-                logger);
+        List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), 
logger);
+        for (String feNode: feNodeList) {
+            try {
+                HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) 
+ QUERY_PLAN);
+                String entity = "{\"sql\": \""+ sql +"\"}";
+                logger.debug("Post body Sending to Doris FE is: '{}'.", 
entity);
+                StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
+                stringEntity.setContentEncoding("UTF-8");
+                stringEntity.setContentType("application/json");
+                httpPost.setEntity(stringEntity);
+
+                String resStr = send(cfg, httpPost, logger);
+                logger.debug("Find partition response is '{}'.", resStr);
+                QueryPlan queryPlan = getQueryPlan(resStr, logger);
+                Map<String, List<Long>> be2Tablets = 
selectBeForTablet(queryPlan, logger);
+                return tabletsMapToPartition(
+                        cfg,
+                        be2Tablets,
+                        queryPlan.getOpaqued_query_plan(),
+                        tableIdentifiers[0],
+                        tableIdentifiers[1],
+                        logger);
+            } catch (ConnectedFailedException e) {
+                logger.info("Doris FE node {} is unavailable: {}, Request the 
next Doris FE node", feNode, e.getMessage());
+            }
+        }
+        String errMsg = "No Doris FE is available, please check configuration";
+        logger.error(errMsg);
+        throw new DorisException(errMsg);
+
     }
 
     /**
@@ -536,19 +568,27 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings 
sparkSettings,  Logger logger) throws DorisException {
-        String feNodes = sparkSettings.getProperty(DORIS_FENODES);
-        String feNode = randomEndpoint(feNodes, logger);
-        String beUrl =   String.format("http://%s"; + BACKENDS_V2, feNode);
-        HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(sparkSettings, httpGet, logger);
-        logger.info("Backend Info:{}", response);
-        List<BackendV2.BackendRowV2> backends = parseBackendV2(response, 
logger);
-        logger.trace("Parse beNodes '{}'.", backends);
-        if (backends == null || backends.isEmpty()) {
-            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
-            throw new IllegalArgumentException("beNodes", 
String.valueOf(backends));
+        List<String> feNodeList = 
allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
+        for (String feNode : feNodeList){
+            try {
+                String beUrl =   String.format("http://%s"; + BACKENDS_V2, 
feNode);
+                HttpGet httpGet = new HttpGet(beUrl);
+                String response = send(sparkSettings, httpGet, logger);
+                logger.info("Backend Info:{}", response);
+                List<BackendV2.BackendRowV2> backends = 
parseBackendV2(response, logger);
+                logger.trace("Parse beNodes '{}'.", backends);
+                if (backends == null || backends.isEmpty()) {
+                    logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", 
backends);
+                    throw new IllegalArgumentException("beNodes", 
String.valueOf(backends));
+                }
+                return backends;
+            } catch (ConnectedFailedException e) {
+                logger.info("Doris FE node {} is unavailable: {}, Request the 
next Doris FE node", feNode, e.getMessage());
+            }
         }
-        return backends;
+        String errMsg = "No Doris FE is available, please check configuration";
+        logger.error(errMsg);
+        throw new DorisException(errMsg);
     }
 
     /**
@@ -629,4 +669,24 @@ public class RestService implements Serializable {
         }
         return partitions;
     }
+
+    /**
+     * choice a Doris FE node to request.
+     *
+     * @param feNodes Doris FE node list, separate be comma
+     * @param logger  slf4j logger
+     * @return the array of Doris FE nodes
+     * @throws IllegalArgumentException fe nodes is illegal
+     */
+    @VisibleForTesting
+    static List<String> allEndpoints(String feNodes, Logger logger) throws 
IllegalArgumentException {
+        logger.trace("Parse fenodes '{}'.", feNodes);
+        if (StringUtils.isEmpty(feNodes)) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+            throw new IllegalArgumentException("fenodes", feNodes);
+        }
+        List<String> nodes = 
Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
+        Collections.shuffle(nodes);
+        return nodes;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to