This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit 3479e9a73359162b826e5c8ea52623c14ea7b6b3
Author: 张家锋 <zhang...@gmail.com>
AuthorDate: Sun Feb 7 09:28:55 2021 +0800

    [Bug] Spark doris connector http v2 authentication fails, and HTTP v2 
interface returns json nesting problem (#5366)
    
    1. Deal with the problem of inconsistent data format returned by http v1 
and v2
    2. Deal with user authentication failure
---
 .../org/apache/doris/spark/rest/RestService.java   | 67 +++++++++++++++++-----
 .../doris/spark/sql/SparkDorisConnector.scala      | 27 +++++++++
 2 files changed, 79 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java 
b/src/main/java/org/apache/doris/spark/rest/RestService.java
index 2404507..3c8249c 100644
--- a/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -32,17 +32,23 @@ import static 
org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESS
 import static 
org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
 import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Base64;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
 import java.util.Set;
+import java.util.HashSet;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.Settings;
@@ -113,32 +119,36 @@ public class RestService implements Serializable {
         String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
         String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
 
-        CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-        credentialsProvider.setCredentials(
-                AuthScope.ANY,
-                new UsernamePasswordCredentials(user, password));
-        HttpClientContext context = HttpClientContext.create();
-        context.setCredentialsProvider(credentialsProvider);
         logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
 
         IOException ex = null;
         int statusCode = -1;
 
         for (int attempt = 0; attempt < retries; attempt++) {
-            CloseableHttpClient httpClient = HttpClients.createDefault();
             logger.debug("Attempt {} to request {}.", attempt, 
request.getURI());
             try {
-                CloseableHttpResponse response = httpClient.execute(request, 
context);
-                statusCode = response.getStatusLine().getStatusCode();
+                HttpURLConnection conn = getConnection(request, user, 
password);
+                statusCode = conn.getResponseCode();
                 if (statusCode != HttpStatus.SC_OK) {
                     logger.warn("Failed to get response from Doris FE {}, http 
code is {}",
                             request.getURI(), statusCode);
                     continue;
                 }
-                String res = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+                InputStream stream = (InputStream) conn.getContent();
+                String res = IOUtils.toString(stream);
                 logger.trace("Success get response from Doris FE: {}, response 
is: {}.",
                         request.getURI(), res);
-                return res;
+
+                ObjectMapper mapper = new ObjectMapper();
+
+                Map map = mapper.readValue(res, Map.class);
+                //Handle the problem of inconsistent data format returned by 
http v1 and v2
+                if(map.containsKey("code") && map.containsKey("msg")) {
+                    Object data = map.get("data");
+                    return mapper.writeValueAsString(data);
+                } else {
+                    return res;
+                }
             } catch (IOException e) {
                 ex = e;
                 logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
@@ -149,6 +159,33 @@ public class RestService implements Serializable {
         throw new ConnectedFailedException(request.getURI().toString(), 
statusCode, ex);
     }
 
+
+    /**
+     * Get http connection
+     * @param request
+     * @param user
+     * @param passwd
+     * @return
+     * @throws IOException
+     */
+    private static HttpURLConnection getConnection(HttpRequestBase request, 
String user, String passwd) throws IOException {
+        URL url = new URL(request.getURI().toString());
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setInstanceFollowRedirects(false);
+        conn.setRequestMethod("POST");
+        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+
+        InputStream content = ((HttpPost) request).getEntity().getContent();
+        String s = IOUtils.toString(content);
+
+        conn.setDoOutput(true);
+        conn.setDoInput(true);
+        PrintWriter out = new PrintWriter(conn.getOutputStream());
+        out.print(s);
+        out.flush();
+        return conn;
+    }
     /**
      * parse table identifier to array.
      * @param tableIdentifier table identifier string
diff --git 
a/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala 
b/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
new file mode 100644
index 0000000..6d299d3
--- /dev/null
+++ b/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
@@ -0,0 +1,27 @@
+package org.apache.doris.spark.sql
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+
+
+object SparkDorisConnector {
+
+    def main(args: Array[String]): Unit = {
+        val sparkConf: SparkConf = new 
SparkConf().setAppName("SparkDorisConnector").setMaster("local[*]")
+        val sc = new SparkContext(sparkConf)
+        sc.setLogLevel("DEBUG")
+        import org.apache.doris.spark._
+        val dorisSparkRDD = sc.dorisRDD(
+            tableIdentifier = Some("db.table1"),
+            cfg = Some(Map(
+                "doris.fenodes" -> "feip:8030",
+                "doris.request.auth.user" -> "root",
+                "doris.request.auth.password" -> ""
+            ))
+        )
+
+        dorisSparkRDD.map(println(_)).count()
+        sc.stop()
+    }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to