This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 74585c3 [improve] add code check for fe request (#309) 74585c3 is described below commit 74585c3b6d123bf6b77e427211754822d3cbcf7b Author: LeiWang <leihz1...@gmail.com> AuthorDate: Thu Apr 24 15:55:40 2025 +0800 [improve] add code check for fe request (#309) --- .../doris/spark/client/DorisFrontendClient.java | 57 +++++++++++----------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java index cbc1352..cefb063 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java @@ -29,7 +29,6 @@ import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.util.HttpUtils; import org.apache.doris.spark.util.URLs; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; @@ -103,20 +102,17 @@ public class DorisFrontendClient implements Serializable { for (String frontendNode : frontendNodeArray) { String[] nodeDetails = frontendNode.split(":"); try { - List<Frontend> list = Collections.singletonList(new Frontend(nodeDetails[0], nodeDetails.length > 1 ? Integer.parseInt(nodeDetails[1]) : -1)); + List<Frontend> list = Collections.singletonList(new Frontend(nodeDetails[0], + nodeDetails.length > 1 ? Integer.parseInt(nodeDetails[1]) : -1)); frontendList = requestFrontends(list, (frontend, client) -> { - HttpGet httpGet = new HttpGet(URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(), isHttpsEnabled)); + String url = URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(), + isHttpsEnabled); + HttpGet httpGet = new HttpGet(url); HttpUtils.setAuth(httpGet, username, password); JsonNode dataNode; try { HttpResponse response = client.execute(httpGet); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new RuntimeException("fetch fe request failed, status: " - + response.getStatusLine().getStatusCode() - + ", reason: " + response.getStatusLine().getReasonPhrase()); - } - String entity = EntityUtils.toString(response.getEntity()); - dataNode = extractEntity(entity, "data"); + dataNode = extractDataFromResponse(response, url); } catch (IOException e) { throw new RuntimeException("fetch fe failed", e); } @@ -250,17 +246,14 @@ public class DorisFrontendClient implements Serializable { public Schema getTableSchema(String db, String table) throws Exception { return requestFrontends((frontend, httpClient) -> { - HttpGet httpGet = new HttpGet(URLs.tableSchema(frontend.getHost(), frontend.getHttpPort(), db, table, isHttpsEnabled)); + String url = URLs.tableSchema(frontend.getHost(), frontend.getHttpPort(), db, table, isHttpsEnabled); + HttpGet httpGet = new HttpGet(url); HttpUtils.setAuth(httpGet, username, password); Schema dorisSchema; try { HttpResponse response = httpClient.execute(httpGet); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new RuntimeException("table schema request failed, code: " + response.getStatusLine().getStatusCode() - + ", reason: " + response.getStatusLine().getReasonPhrase()); - } - String entity = EntityUtils.toString(response.getEntity()); - dorisSchema = MAPPER.readValue(extractEntity(entity, "data").traverse(), Schema.class); + JsonNode dataNode = extractDataFromResponse(response, url); + dorisSchema = MAPPER.readValue(dataNode.traverse(), Schema.class); } catch (IOException e) { throw new RuntimeException("table schema request failed", e); } @@ -311,7 +304,8 @@ public class DorisFrontendClient implements Serializable { public QueryPlan getQueryPlan(String database, String table, String sql) throws Exception { return requestFrontends((frontend, httpClient) -> { try { - HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend.getHost(), frontend.getHttpPort(), database, table, isHttpsEnabled)); + String url = URLs.queryPlan(frontend.getHost(), frontend.getHttpPort(), database, table, isHttpsEnabled); + HttpPost httpPost = new HttpPost(url); HttpUtils.setAuth(httpPost, username, password); String body = MAPPER.writeValueAsString(ImmutableMap.of("sql", sql)); StringEntity stringEntity = new StringEntity(body, StandardCharsets.UTF_8); @@ -319,12 +313,7 @@ public class DorisFrontendClient implements Serializable { stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); HttpResponse response = httpClient.execute(httpPost); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new DorisException("query plan request failed, code: " + response.getStatusLine().getStatusCode() - + ", reason: " + response.getStatusLine().getReasonPhrase()); - } - String entity = EntityUtils.toString(response.getEntity()); - JsonNode dataJsonNode = extractEntity(entity, "data"); + JsonNode dataJsonNode = extractDataFromResponse(response, url); if (dataJsonNode.get("exception") != null) { throw new DorisException("query plan failed, exception: " + dataJsonNode.get("exception").asText()); } @@ -335,8 +324,20 @@ public class DorisFrontendClient implements Serializable { }); } - private JsonNode extractEntity(String entityStr, String fieldName) throws JsonProcessingException { - return MAPPER.readTree(entityStr).get(fieldName); + + private JsonNode extractDataFromResponse(HttpResponse response, String url) throws IOException { + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new RuntimeException("request fe with url: [" + url + "] failed with http code: " + + response.getStatusLine().getStatusCode() + ", reason: " + + response.getStatusLine().getReasonPhrase()); + } + String entity = EntityUtils.toString(response.getEntity()); + JsonNode respNode = MAPPER.readTree(entity); + String code = respNode.get("code").asText(); + if (!"0".equalsIgnoreCase(code)) { + throw new RuntimeException("fetch fe url:[" + url + "] failed with invalid msg code, response: " + entity); + } + return respNode.get("data"); } public String[] getTableAllColumns(String db, String table) throws Exception { @@ -352,8 +353,8 @@ public class DorisFrontendClient implements Serializable { ArrayNode backendsNode; try { CloseableHttpResponse res = client.execute(httpGet); - String content = EntityUtils.toString(res.getEntity()); - backendsNode = (ArrayNode) extractEntity(content, "data").get("backends"); + JsonNode dataNode = extractDataFromResponse(res, url); + backendsNode = (ArrayNode) dataNode.get("backends"); } catch (IOException e) { throw new RuntimeException("get alive backends failed", e); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org