This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 3479e9a73359162b826e5c8ea52623c14ea7b6b3 Author: 张家锋 <zhang...@gmail.com> AuthorDate: Sun Feb 7 09:28:55 2021 +0800 [Bug] Spark doris connector http v2 authentication fails, and HTTP v2 interface returns json nesting problem (#5366) 1. Deal with the problem of inconsistent data format returned by http v1 and v2 2. Deal with user authentication failure --- .../org/apache/doris/spark/rest/RestService.java | 67 +++++++++++++++++----- .../doris/spark/sql/SparkDorisConnector.scala | 27 +++++++++ 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index 2404507..3c8249c 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -32,17 +32,23 @@ import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESS import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Base64; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.ArrayList; import java.util.Set; +import java.util.HashSet; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; @@ -113,32 +119,36 @@ public class RestService implements Serializable { String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(user, password)); - HttpClientContext context = HttpClientContext.create(); - context.setCredentialsProvider(credentialsProvider); logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { - CloseableHttpClient httpClient = HttpClients.createDefault(); logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { - CloseableHttpResponse response = httpClient.execute(request, context); - statusCode = response.getStatusLine().getStatusCode(); + HttpURLConnection conn = getConnection(request, user, password); + statusCode = conn.getResponseCode(); if (statusCode != HttpStatus.SC_OK) { logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), statusCode); continue; } - String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + InputStream stream = (InputStream) conn.getContent(); + String res = IOUtils.toString(stream); logger.trace("Success get response from Doris FE: {}, response is: {}.", request.getURI(), res); - return res; + + ObjectMapper mapper = new ObjectMapper(); + + Map map = mapper.readValue(res, Map.class); + //Handle the problem of inconsistent data format returned by http v1 and v2 + if(map.containsKey("code") && map.containsKey("msg")) { + Object data = map.get("data"); + return mapper.writeValueAsString(data); + } else { + return res; + } } catch (IOException e) { ex = e; logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e); @@ -149,6 +159,33 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } + + /** + * Get http connection + * @param request + * @param user + * @param passwd + * @return + * @throws IOException + */ + private static HttpURLConnection getConnection(HttpRequestBase request, String user, String passwd) throws IOException { + URL url = new URL(request.getURI().toString()); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setInstanceFollowRedirects(false); + conn.setRequestMethod("POST"); + String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + conn.setRequestProperty("Authorization", "Basic " + authEncoding); + + InputStream content = ((HttpPost) request).getEntity().getContent(); + String s = IOUtils.toString(content); + + conn.setDoOutput(true); + conn.setDoInput(true); + PrintWriter out = new PrintWriter(conn.getOutputStream()); + out.print(s); + out.flush(); + return conn; + } /** * parse table identifier to array. * @param tableIdentifier table identifier string diff --git a/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala b/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala new file mode 100644 index 0000000..6d299d3 --- /dev/null +++ b/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala @@ -0,0 +1,27 @@ +package org.apache.doris.spark.sql + +import org.apache.spark.{SparkConf, SparkContext} + + + +object SparkDorisConnector { + + def main(args: Array[String]): Unit = { + val sparkConf: SparkConf = new SparkConf().setAppName("SparkDorisConnector").setMaster("local[*]") + val sc = new SparkContext(sparkConf) + sc.setLogLevel("DEBUG") + import org.apache.doris.spark._ + val dorisSparkRDD = sc.dorisRDD( + tableIdentifier = Some("db.table1"), + cfg = Some(Map( + "doris.fenodes" -> "feip:8030", + "doris.request.auth.user" -> "root", + "doris.request.auth.password" -> "" + )) + ) + + dorisSparkRDD.map(println(_)).count() + sc.stop() + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org