This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new fadbada Supports traversal of Doris FE nodes when searching for Doris BE (#67) fadbada is described below commit fadbada36f4088addd80a688191ce37693cda093 Author: MrZHui888 <18601927...@163.com> AuthorDate: Mon Feb 6 18:34:19 2023 +0800 Supports traversal of Doris FE nodes when searching for Doris BE (#67) Co-authored-by: 顾忠辉 <guzhong...@yiche.com> --- .../org/apache/doris/spark/rest/RestService.java | 130 +++++++++++++++------ 1 file changed, 95 insertions(+), 35 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java index b614e1a..a00385c 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -263,6 +263,17 @@ public class RestService implements Serializable { "/"; } + @VisibleForTesting + static String getUriStr(String feNode,Settings cfg, Logger logger) throws IllegalArgumentException { + String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger); + return "http://" + + feNode + API_PREFIX + + "/" + identifier[0] + + "/" + identifier[1] + + "/"; + } + + /** * discover Doris table schema from Doris FE. * @param cfg configuration of request @@ -273,10 +284,20 @@ public class RestService implements Serializable { public static Schema getSchema(Settings cfg, Logger logger) throws DorisException { logger.trace("Finding schema."); - HttpGet httpGet = new HttpGet(getUriStr(cfg, logger) + SCHEMA); - String response = send(cfg, httpGet, logger); - logger.debug("Find schema response is '{}'.", response); - return parseSchema(response, logger); + List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger); + for (String feNode: feNodeList) { + try { + HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + SCHEMA); + String response = send(cfg, httpGet, logger); + logger.debug("Find schema response is '{}'.", response); + return parseSchema(response, logger); + } catch (ConnectedFailedException e) { + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + } + } + String errMsg = "No Doris FE is available, please check configuration"; + logger.error(errMsg); + throw new DorisException(errMsg); } /** @@ -337,25 +358,36 @@ public class RestService implements Serializable { } logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); - HttpPost httpPost = new HttpPost(getUriStr(cfg, logger) + QUERY_PLAN); - String entity = "{\"sql\": \""+ sql +"\"}"; - logger.debug("Post body Sending to Doris FE is: '{}'.", entity); - StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); - stringEntity.setContentEncoding("UTF-8"); - stringEntity.setContentType("application/json"); - httpPost.setEntity(stringEntity); - - String resStr = send(cfg, httpPost, logger); - logger.debug("Find partition response is '{}'.", resStr); - QueryPlan queryPlan = getQueryPlan(resStr, logger); - Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger); - return tabletsMapToPartition( - cfg, - be2Tablets, - queryPlan.getOpaqued_query_plan(), - tableIdentifiers[0], - tableIdentifiers[1], - logger); + List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger); + for (String feNode: feNodeList) { + try { + HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) + QUERY_PLAN); + String entity = "{\"sql\": \""+ sql +"\"}"; + logger.debug("Post body Sending to Doris FE is: '{}'.", entity); + StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); + stringEntity.setContentEncoding("UTF-8"); + stringEntity.setContentType("application/json"); + httpPost.setEntity(stringEntity); + + String resStr = send(cfg, httpPost, logger); + logger.debug("Find partition response is '{}'.", resStr); + QueryPlan queryPlan = getQueryPlan(resStr, logger); + Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger); + return tabletsMapToPartition( + cfg, + be2Tablets, + queryPlan.getOpaqued_query_plan(), + tableIdentifiers[0], + tableIdentifiers[1], + logger); + } catch (ConnectedFailedException e) { + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + } + } + String errMsg = "No Doris FE is available, please check configuration"; + logger.error(errMsg); + throw new DorisException(errMsg); + } /** @@ -536,19 +568,27 @@ public class RestService implements Serializable { */ @VisibleForTesting public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException { - String feNodes = sparkSettings.getProperty(DORIS_FENODES); - String feNode = randomEndpoint(feNodes, logger); - String beUrl = String.format("http://%s" + BACKENDS_V2, feNode); - HttpGet httpGet = new HttpGet(beUrl); - String response = send(sparkSettings, httpGet, logger); - logger.info("Backend Info:{}", response); - List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); - logger.trace("Parse beNodes '{}'.", backends); - if (backends == null || backends.isEmpty()) { - logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); - throw new IllegalArgumentException("beNodes", String.valueOf(backends)); + List<String> feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger); + for (String feNode : feNodeList){ + try { + String beUrl = String.format("http://%s" + BACKENDS_V2, feNode); + HttpGet httpGet = new HttpGet(beUrl); + String response = send(sparkSettings, httpGet, logger); + logger.info("Backend Info:{}", response); + List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); + logger.trace("Parse beNodes '{}'.", backends); + if (backends == null || backends.isEmpty()) { + logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); + throw new IllegalArgumentException("beNodes", String.valueOf(backends)); + } + return backends; + } catch (ConnectedFailedException e) { + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + } } - return backends; + String errMsg = "No Doris FE is available, please check configuration"; + logger.error(errMsg); + throw new DorisException(errMsg); } /** @@ -629,4 +669,24 @@ public class RestService implements Serializable { } return partitions; } + + /** + * choice a Doris FE node to request. + * + * @param feNodes Doris FE node list, separate be comma + * @param logger slf4j logger + * @return the array of Doris FE nodes + * @throws IllegalArgumentException fe nodes is illegal + */ + @VisibleForTesting + static List<String> allEndpoints(String feNodes, Logger logger) throws IllegalArgumentException { + logger.trace("Parse fenodes '{}'.", feNodes); + if (StringUtils.isEmpty(feNodes)) { + logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes); + throw new IllegalArgumentException("fenodes", feNodes); + } + List<String> nodes = Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList()); + Collections.shuffle(nodes); + return nodes; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org