This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 00589ffc6e6768c98b3b6a28871065c75b1f1cc4 Author: wei zhao <zhaowei_3...@163.com> AuthorDate: Thu Nov 4 12:13:18 2021 +0800 [Revert] Revert RestService.java (#6994) --- .../org/apache/doris/spark/rest/RestService.java | 139 ++++++++++++++------- .../apache/doris/spark/rest/TestRestService.java | 20 +++ 2 files changed, 116 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index dce540c..bb91538 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -31,19 +31,27 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Base64; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.ArrayList; import java.util.Set; +import java.util.HashSet; import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; @@ -52,22 +60,17 @@ import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.exception.ShouldNeverHappenException; -import org.apache.doris.spark.rest.models.*; +import org.apache.doris.spark.rest.models.Backend; +import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.QueryPlan; +import org.apache.doris.spark.rest.models.Schema; +import org.apache.doris.spark.rest.models.Tablet; import org.apache.http.HttpStatus; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; @@ -83,7 +86,8 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; - private static final String BACKENDS = "/api/show_proc?path=//backends"; + private static final String BACKENDS = "/rest/v1/system?path=//backends"; + /** * send request to Doris FE and get response json string. @@ -110,36 +114,37 @@ public class RestService implements Serializable { .build(); request.setConfig(requestConfig); - String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); - - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(user, password)); - HttpClientContext context = HttpClientContext.create(); - context.setCredentialsProvider(credentialsProvider); logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); - IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { - CloseableHttpClient httpClient = HttpClients.createDefault(); logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { - CloseableHttpResponse response = httpClient.execute(request, context); - statusCode = response.getStatusLine().getStatusCode(); - if (statusCode != HttpStatus.SC_OK) { + String response; + if (request instanceof HttpGet){ + response = getConnectionGet(request.getURI().toString(), user, password,logger); + } else { + response = getConnectionPost(request,user, password,logger); + } + if (response == null) { logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), statusCode); continue; } - String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); logger.trace("Success get response from Doris FE: {}, response is: {}.", - request.getURI(), res); - return res; + request.getURI(), response); + ObjectMapper mapper = new ObjectMapper(); + Map map = mapper.readValue(response, Map.class); + //Handle the problem of inconsistent data format returned by http v1 and v2 + if (map.containsKey("code") && map.containsKey("msg")) { + Object data = map.get("data"); + return mapper.writeValueAsString(data); + } else { + return response; + } } catch (IOException e) { ex = e; logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e); @@ -150,6 +155,54 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } + private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException { + URL realUrl = new URL(request); + // open connection + HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection(); + String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + authEncoding); + + connection.connect(); + return parseResponse(connection,logger); + } + + private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException { + if (connection.getResponseCode() != HttpStatus.SC_OK) { + logger.warn("Failed to get response from Doris {}, http code is {}", + connection.getURL(), connection.getResponseCode()); + throw new IOException("Failed to get response from Doris"); + } + String result = ""; + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); + String line; + while ((line = in.readLine()) != null) { + result += line; + } + if (in != null) { + in.close(); + } + return result; + } + + private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { + URL url = new URL(request.getURI().toString()); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setInstanceFollowRedirects(false); + conn.setRequestMethod(request.getMethod()); + String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + conn.setRequestProperty("Authorization", "Basic " + authEncoding); + InputStream content = ((HttpPost)request).getEntity().getContent(); + String res = IOUtils.toString(content); + conn.setDoOutput(true); + conn.setDoInput(true); + PrintWriter out = new PrintWriter(conn.getOutputStream()); + // send request params + out.print(res); + // flush + out.flush(); + // read response + return parseResponse(conn,logger); + } /** * parse table identifier to array. * @param tableIdentifier table identifier string @@ -426,7 +479,6 @@ public class RestService implements Serializable { * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal */ - @VisibleForTesting public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException { String feNodes = sparkSettings.getProperty(DORIS_FENODES); String feNode = randomEndpoint(feNodes, logger); @@ -446,12 +498,19 @@ public class RestService implements Serializable { } - + /** + * translate Doris FE response to inner {@link BackendRow} struct. + * @param response Doris FE response + * @param logger {@link Logger} + * @return inner {@link List<BackendRow>} struct + * @throws DorisException,IOException throw when translate failed + * */ + @VisibleForTesting static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); - List<List<String>> backend; + Backend backend; try { - backend = mapper.readValue(response, List.class); + backend = mapper.readValue(response, Backend.class); } catch (com.fasterxml.jackson.core.JsonParseException e) { String errMsg = "Doris BE's response is not a json. res: " + response; logger.error(errMsg, e); @@ -470,13 +529,7 @@ public class RestService implements Serializable { logger.error(SHOULD_NOT_HAPPEN_MESSAGE); throw new ShouldNeverHappenException(); } - List<BackendRow> backendRows = backend.stream().map(array -> { - BackendRow backendRow = new BackendRow(); - backendRow.setIP(array.get(2)); - backendRow.setHttpPort(array.get(6)); - backendRow.setAlive(Boolean.parseBoolean(array.get(10))); - return backendRow; - }).filter(v -> v.getAlive()).collect(Collectors.toList()); + List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList()); logger.debug("Parsing schema result is '{}'.", backendRows); return backendRows; } @@ -494,7 +547,7 @@ public class RestService implements Serializable { */ @VisibleForTesting static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets, - String opaquedQueryPlan, String database, String table, Logger logger) + String opaquedQueryPlan, String database, String table, Logger logger) throws IllegalArgumentException { int tabletsSize = tabletCountLimitForOnePartition(cfg, logger); List<PartitionDefinition> partitions = new ArrayList<>(); diff --git a/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/src/test/java/org/apache/doris/spark/rest/TestRestService.java index 95ae0c2..484be45 100644 --- a/src/test/java/org/apache/doris/spark/rest/TestRestService.java +++ b/src/test/java/org/apache/doris/spark/rest/TestRestService.java @@ -37,6 +37,7 @@ import org.apache.doris.spark.cfg.PropertiesSettings; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; +import org.apache.doris.spark.rest.models.BackendRow; import org.apache.doris.spark.rest.models.Field; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; @@ -293,4 +294,23 @@ public class TestRestService { Assert.assertEquals(expected, actual); } + + @Test + public void testParseBackend() throws Exception { + String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," + + "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," + + "\"HttpPort\",\"BrpcPort\",\"LastStartTime\",\"LastHeartbeat\",\"Alive\",\"SystemDecommissioned\"," + + "\"ClusterDecommissioned\",\"TabletNum\",\"DataUsedCapacity\",\"AvailCapacity\",\"TotalCapacity\"," + + "\"UsedPct\",\"MaxDiskUsedPct\",\"Tag\",\"ErrMsg\",\"Version\",\"Status\"],\"rows\":[{\"HttpPort\":" + + "\"8040\",\"Status\":\"{\\\"lastSuccessReportTabletsTime\\\":\\\"N/A\\\",\\\"lastStreamLoadTime\\\":" + + "-1}\",\"SystemDecommissioned\":\"false\",\"LastHeartbeat\":\"\\\\N\",\"DataUsedCapacity\":\"0.000 " + + "\",\"ErrMsg\":\"\",\"IP\":\"127.0.0.1\",\"UsedPct\":\"0.00 %\",\"__hrefPaths\":[\"/rest/v1/system?" + + "path=//backends/10002\"],\"Cluster\":\"default_cluster\",\"Alive\":\"true\",\"MaxDiskUsedPct\":" + + "\"0.00 %\",\"BrpcPort\":\"-1\",\"BePort\":\"-1\",\"ClusterDecommissioned\":\"false\"," + + "\"AvailCapacity\":\"1.000 B\",\"Version\":\"\",\"BackendId\":\"10002\",\"HeartbeatPort\":\"9050\"," + + "\"LastStartTime\":\"\\\\N\",\"TabletNum\":\"0\",\"TotalCapacity\":\"0.000 \",\"Tag\":" + + "\"{\\\"location\\\" : \\\"default\\\"}\",\"HostName\":\"localhost\"}]}"; + List<BackendRow> backendRows = RestService.parseBackend(response, logger); + Assert.assertTrue(backendRows != null && !backendRows.isEmpty()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org