This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 74585c3  [improve] add code check for fe request (#309)
74585c3 is described below

commit 74585c3b6d123bf6b77e427211754822d3cbcf7b
Author: LeiWang <leihz1...@gmail.com>
AuthorDate: Thu Apr 24 15:55:40 2025 +0800

    [improve] add code check for fe request (#309)
---
 .../doris/spark/client/DorisFrontendClient.java    | 57 +++++++++++-----------
 1 file changed, 29 insertions(+), 28 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index cbc1352..cefb063 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -29,7 +29,6 @@ import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.util.HttpUtils;
 import org.apache.doris.spark.util.URLs;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.json.JsonMapper;
@@ -103,20 +102,17 @@ public class DorisFrontendClient implements Serializable {
             for (String frontendNode : frontendNodeArray) {
                 String[] nodeDetails = frontendNode.split(":");
                 try {
-                    List<Frontend> list = Collections.singletonList(new 
Frontend(nodeDetails[0], nodeDetails.length > 1 ? 
Integer.parseInt(nodeDetails[1]) : -1));
+                    List<Frontend> list = Collections.singletonList(new 
Frontend(nodeDetails[0],
+                            nodeDetails.length > 1 ? 
Integer.parseInt(nodeDetails[1]) : -1));
                     frontendList = requestFrontends(list, (frontend, client) 
-> {
-                        HttpGet httpGet = new 
HttpGet(URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(), 
isHttpsEnabled));
+                        String url = URLs.getFrontEndNodes(frontend.getHost(), 
frontend.getHttpPort(),
+                                isHttpsEnabled);
+                        HttpGet httpGet = new HttpGet(url);
                         HttpUtils.setAuth(httpGet, username, password);
                         JsonNode dataNode;
                         try {
                             HttpResponse response = client.execute(httpGet);
-                            if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                                throw new RuntimeException("fetch fe request 
failed, status: "
-                                        + 
response.getStatusLine().getStatusCode()
-                                        + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                            }
-                            String entity = 
EntityUtils.toString(response.getEntity());
-                            dataNode = extractEntity(entity, "data");
+                            dataNode = extractDataFromResponse(response, url);
                         } catch (IOException e) {
                             throw new RuntimeException("fetch fe failed", e);
                         }
@@ -250,17 +246,14 @@ public class DorisFrontendClient implements Serializable {
 
     public Schema getTableSchema(String db, String table) throws Exception {
         return requestFrontends((frontend, httpClient) -> {
-            HttpGet httpGet = new HttpGet(URLs.tableSchema(frontend.getHost(), 
frontend.getHttpPort(), db, table, isHttpsEnabled));
+            String url = URLs.tableSchema(frontend.getHost(), 
frontend.getHttpPort(), db, table, isHttpsEnabled);
+            HttpGet httpGet = new HttpGet(url);
             HttpUtils.setAuth(httpGet, username, password);
             Schema dorisSchema;
             try {
                 HttpResponse response = httpClient.execute(httpGet);
-                if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                    throw new RuntimeException("table schema request failed, 
code: " + response.getStatusLine().getStatusCode()
-                            + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                }
-                String entity = EntityUtils.toString(response.getEntity());
-                dorisSchema = MAPPER.readValue(extractEntity(entity, 
"data").traverse(), Schema.class);
+                JsonNode dataNode = extractDataFromResponse(response, url);
+                dorisSchema = MAPPER.readValue(dataNode.traverse(), 
Schema.class);
             } catch (IOException e) {
                 throw new RuntimeException("table schema request failed", e);
             }
@@ -311,7 +304,8 @@ public class DorisFrontendClient implements Serializable {
     public QueryPlan getQueryPlan(String database, String table, String sql) 
throws Exception {
         return requestFrontends((frontend, httpClient) -> {
             try {
-                HttpPost httpPost = new 
HttpPost(URLs.queryPlan(frontend.getHost(), frontend.getHttpPort(), database, 
table, isHttpsEnabled));
+                String url = URLs.queryPlan(frontend.getHost(), 
frontend.getHttpPort(), database, table, isHttpsEnabled);
+                HttpPost httpPost = new HttpPost(url);
                 HttpUtils.setAuth(httpPost, username, password);
                 String body = MAPPER.writeValueAsString(ImmutableMap.of("sql", 
sql));
                 StringEntity stringEntity = new StringEntity(body, 
StandardCharsets.UTF_8);
@@ -319,12 +313,7 @@ public class DorisFrontendClient implements Serializable {
                 stringEntity.setContentType("application/json");
                 httpPost.setEntity(stringEntity);
                 HttpResponse response = httpClient.execute(httpPost);
-                if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                    throw new DorisException("query plan request failed, code: 
" + response.getStatusLine().getStatusCode()
-                            + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                }
-                String entity = EntityUtils.toString(response.getEntity());
-                JsonNode dataJsonNode = extractEntity(entity, "data");
+                JsonNode dataJsonNode = extractDataFromResponse(response, url);
                 if (dataJsonNode.get("exception") != null) {
                     throw new DorisException("query plan failed, exception: " 
+ dataJsonNode.get("exception").asText());
                 }
@@ -335,8 +324,20 @@ public class DorisFrontendClient implements Serializable {
         });
     }
 
-    private JsonNode extractEntity(String entityStr, String fieldName) throws 
JsonProcessingException {
-        return MAPPER.readTree(entityStr).get(fieldName);
+
+    private JsonNode extractDataFromResponse(HttpResponse response, String 
url) throws IOException {
+        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+            throw new RuntimeException("request fe with url: [" + url + "] 
failed with http code: "
+                    + response.getStatusLine().getStatusCode() + ", reason: "
+                    + response.getStatusLine().getReasonPhrase());
+        }
+        String entity = EntityUtils.toString(response.getEntity());
+        JsonNode respNode = MAPPER.readTree(entity);
+        String code = respNode.get("code").asText();
+        if (!"0".equalsIgnoreCase(code)) {
+            throw new RuntimeException("fetch fe url:[" + url + "] failed with 
invalid msg code, response: " + entity);
+        }
+        return respNode.get("data");
     }
 
     public String[] getTableAllColumns(String db, String table) throws 
Exception {
@@ -352,8 +353,8 @@ public class DorisFrontendClient implements Serializable {
             ArrayNode backendsNode;
             try {
                 CloseableHttpResponse res = client.execute(httpGet);
-                String content = EntityUtils.toString(res.getEntity());
-                backendsNode = (ArrayNode) extractEntity(content, 
"data").get("backends");
+                JsonNode dataNode = extractDataFromResponse(res, url);
+                backendsNode = (ArrayNode) dataNode.get("backends");
             } catch (IOException e) {
                 throw new RuntimeException("get alive backends failed", e);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to