This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d2ef93  [Enhancement](connector) enhance the FE nodes port check and 
improve write performace  (#303)
0d2ef93 is described below

commit 0d2ef93c96c15fe8ce2c9a12cc50eb1c92554581
Author: LeiWang <leihz1...@gmail.com>
AuthorDate: Mon Mar 31 22:30:20 2025 +0800

    [Enhancement](connector) enhance the FE nodes port check and improve write 
performace  (#303)
---
 .../doris/spark/client/DorisFrontendClient.java    |   4 +-
 .../client/write/AbstractStreamLoadProcessor.java  | 186 ++++++++++++++-------
 .../spark/client/write/StreamLoadProcessor.java    |  14 +-
 .../org/apache/doris/spark/config/DorisConfig.java |   6 +-
 .../apache/doris/spark/rest/models/DataFormat.java |  24 +++
 .../apache/doris/spark/util/RowConvertors.scala    |  23 ++-
 .../scala/org/apache/doris/spark/util/URLs.scala   |  25 ++-
 .../doris/spark/sql/TestSparkConnector.scala       |  41 ++++-
 8 files changed, 229 insertions(+), 94 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index 1c4c49c..3298be0 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -101,7 +101,7 @@ public class DorisFrontendClient implements Serializable {
             for (String frontendNode : frontendNodeArray) {
                 String[] nodeDetails = frontendNode.split(":");
                 try {
-                    List<Frontend> list = Collections.singletonList(new 
Frontend(nodeDetails[0], Integer.parseInt(nodeDetails[1])));
+                    List<Frontend> list = Collections.singletonList(new 
Frontend(nodeDetails[0], nodeDetails.length > 1 ? 
Integer.parseInt(nodeDetails[1]) : -1));
                     frontendList = requestFrontends(list, (frontend, client) 
-> {
                         HttpGet httpGet = new 
HttpGet(URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(), 
isHttpsEnabled));
                         HttpUtils.setAuth(httpGet, username, password);
@@ -142,7 +142,7 @@ public class DorisFrontendClient implements Serializable {
             return Arrays.stream(frontendNodeArray)
                     .map(node -> {
                         String[] nodeParts = node.split(":");
-                        return new Frontend(nodeParts[0], 
Integer.parseInt(nodeParts[1]), queryPort, flightSqlPort);
+                        return new Frontend(nodeParts[0], nodeParts.length > 1 
? Integer.parseInt(nodeParts[1]) : -1, queryPort, flightSqlPort);
                     })
                     .collect(Collectors.toList());
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 37d3a48..169f5a8 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -17,6 +17,12 @@
 
 package org.apache.doris.spark.client.write;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import org.apache.doris.spark.client.DorisBackendHttpClient;
 import org.apache.doris.spark.client.DorisFrontendClient;
 import org.apache.doris.spark.client.entity.Backend;
@@ -25,12 +31,11 @@ import org.apache.doris.spark.config.DorisConfig;
 import org.apache.doris.spark.config.DorisOptions;
 import org.apache.doris.spark.exception.OptionRequiredException;
 import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.models.DataFormat;
 import org.apache.doris.spark.util.EscapeHandler;
 import org.apache.doris.spark.util.HttpUtils;
 import org.apache.doris.spark.util.URLs;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
@@ -46,8 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -87,11 +90,11 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
 
     private final Map<String, String> properties;
 
-    private final String format;
+    private final DataFormat format;
 
     protected String columnSeparator;
 
-    private String lineDelimiter;
+    private byte[] lineDelimiter;
 
     private final boolean isGzipCompressionEnabled;
 
@@ -128,7 +131,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         this.properties = config.getSinkProperties();
         // init stream load props
         this.isTwoPhaseCommitEnabled = 
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC);
-        this.format = properties.getOrDefault("format", "csv");
+        this.format = DataFormat.valueOf(properties.getOrDefault("format", 
"csv").toUpperCase());
         this.isGzipCompressionEnabled = 
properties.containsKey("compress_type") && 
"gzip".equals(properties.get("compress_type"));
         if (properties.containsKey(GROUP_COMMIT)) {
             String message = "";
@@ -154,6 +157,11 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
             }
             createNewBatch = false;
         }
+        if (isFirstRecordOfBatch) {
+            isFirstRecordOfBatch = false;
+        } else if (lineDelimiter != null){
+            output.write(lineDelimiter);
+        }
         output.write(toFormat(row, format));
         currentBatchCount++;
     }
@@ -162,8 +170,9 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
     public String stop() throws Exception {
         if (requestFuture != null) {
             createNewBatch = true;
+            isFirstRecordOfBatch = true;
             // arrow format need to send all buffer data before stop
-            if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
+            if (!recordBuffer.isEmpty() && DataFormat.ARROW.equals(format)) {
                 List<R> rs = new LinkedList<>(recordBuffer);
                 recordBuffer.clear();
                 output.write(toArrowFormat(rs));
@@ -187,66 +196,115 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         return null;
     }
 
+    private void execCommitReq(String host, int port, String msg, 
CloseableHttpClient httpClient) {
+        HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, 
isHttpsEnabled));
+        try {
+            handleCommitHeaders(httpPut, msg);
+        } catch (OptionRequiredException e) {
+            throw new RuntimeException("stream load handle commit props 
failed", e);
+        }
+        try {
+            CloseableHttpResponse response = httpClient.execute(httpPut);
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new RuntimeException("commit transaction failed, 
transaction: " + msg + ", status: "
+                        + response.getStatusLine().getStatusCode() + ", 
reason: " + response.getStatusLine()
+                        .getReasonPhrase());
+            } else {
+                String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
+                if(!checkTransResponse(resEntity)) {
+                    throw new RuntimeException("commit transaction failed, 
transaction: " + msg + ", resp: " + resEntity);
+                } else {
+                    this.logger.info("commit: {} response: {}", msg, 
resEntity);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
+        }
+    }
+
     @Override
     public void commit(String msg) throws Exception {
         if (isTwoPhaseCommitEnabled) {
             logger.info("begin to commit transaction {}", msg);
-            frontend.requestFrontends((frontEnd, httpClient) -> {
-                HttpPut httpPut = new 
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), 
database, isHttpsEnabled));
-                try {
-                    handleCommitHeaders(httpPut, msg);
-                } catch (OptionRequiredException e) {
-                    throw new RuntimeException("stream load handle commit 
props failed", e);
+            if (autoRedirect) {
+                frontend.requestFrontends((frontEnd, httpClient) -> {
+                    execCommitReq(frontEnd.getHost(), frontEnd.getHttpPort(), 
msg, httpClient);
+                    return null;
+                });
+            } else {
+                backendHttpClient.executeReq((backend, httpClient) -> {
+                    execCommitReq(backend.getHost(), backend.getHttpPort(), 
msg, httpClient);
+                    return null;
+                });
+            }
+            logger.info("success to commit transaction {}", msg);
+        }
+    }
+
+    private void execAbortReq(String host, int port, String msg, 
CloseableHttpClient httpClient) {
+        HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, 
isHttpsEnabled));
+        try {
+            handleAbortHeaders(httpPut, msg);
+        } catch (OptionRequiredException e) {
+            throw new RuntimeException("stream load handle abort props 
failed", e);
+        }
+        try {
+            CloseableHttpResponse response = httpClient.execute(httpPut);
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                throw new RuntimeException("abort transaction failed, 
transaction: " + msg + ", status: "
+                        + response.getStatusLine().getStatusCode() + ", 
reason: " + response.getStatusLine()
+                        .getReasonPhrase());
+            } else {
+                String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity()));
+                if(!checkTransResponse(resEntity)) {
+                    throw new RuntimeException("abort transaction failed, 
transaction: " + msg + ", resp: " + resEntity);
+                } else {
+                    this.logger.info("abort: {} response: {}", msg, resEntity);
                 }
-                try(CloseableHttpResponse response = 
httpClient.execute(httpPut)){
-                    if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                        throw new RuntimeException("commit transaction failed, 
transaction: " + msg
-                                + ", status: " + 
response.getStatusLine().getStatusCode()
-                                + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                    }
-                    logger.info("commit response: {}", 
EntityUtils.toString(response.getEntity()));
-                } catch (IOException e) {
-                    throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("abort transaction failed, transaction: 
" + msg, e);
+        }
+    }
+
+    private boolean checkTransResponse(String resEntity) {
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            String status = 
objectMapper.readTree(resEntity).get("status").asText();
+                if ("Success".equalsIgnoreCase(status)) {
+                    return true;
                 }
-                return null;
-            });
-            logger.info("success to commit transaction {}", msg);
+        } catch (JsonProcessingException e) {
+            logger.warn("invalid json response: " +  resEntity, e);
         }
+        return false;
     }
 
     @Override
     public void abort(String msg) throws Exception {
         if (isTwoPhaseCommitEnabled) {
             logger.info("begin to abort transaction {}", msg);
-            frontend.requestFrontends((frontEnd, httpClient) -> {
-                HttpPut httpPut = new 
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), 
database, isHttpsEnabled));
-                try {
-                    handleAbortHeaders(httpPut, msg);
-                } catch (OptionRequiredException e) {
-                    throw new RuntimeException("stream load handle abort props 
failed", e);
-                }
-                try(CloseableHttpResponse response = 
httpClient.execute(httpPut)){
-                    if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
-                        throw new RuntimeException("abort transaction failed, 
transaction: " + msg
-                                + ", status: " + 
response.getStatusLine().getStatusCode()
-                                + ", reason: " + 
response.getStatusLine().getReasonPhrase());
-                    }
-                    logger.info("abort response: {}", 
EntityUtils.toString(response.getEntity()));
-                } catch (IOException e) {
-                    throw new RuntimeException("abort transaction failed, 
transaction: " + msg, e);
-                }
-                return null; // Returning null as the callback does not return 
anything
-            });
+            if (autoRedirect) {
+                frontend.requestFrontends((frontEnd, httpClient) -> {
+                    execAbortReq(frontEnd.getHost(), frontEnd.getHttpPort(), 
msg, httpClient);
+                    return null; // Returning null as the callback does not 
return anything
+                });
+            } else {
+                backendHttpClient.executeReq((backend, httpClient) -> {
+                    execAbortReq(backend.getHost(), backend.getHttpPort(), 
msg, httpClient);
+                    return null; // Returning null as the callback does not 
return anything
+                });
+            }
             logger.info("success to abort transaction {}", msg);
         }
     }
 
-    private byte[] toFormat(R row, String format) throws IOException {
-        switch (format.toLowerCase()) {
-            case "csv":
-            case "json":
+    private byte[] toFormat(R row, DataFormat format) throws IOException {
+        switch (format) {
+            case CSV:
+            case JSON:
                 return toStringFormat(row, format);
-            case "arrow":
+            case ARROW:
                 recordBuffer.add(copy(row));
                 if (recordBuffer.size() < arrowBufferSize) {
                     return new byte[0];
@@ -260,16 +318,13 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         }
     }
 
-    private byte[] toStringFormat(R row, String format) {
-        String prefix = isFirstRecordOfBatch ? "" : lineDelimiter;
-        isFirstRecordOfBatch = false;
-        String stringRow = isPassThrough ? getPassThroughData(row) : 
stringify(row, format);
-        return (prefix + stringRow).getBytes(StandardCharsets.UTF_8);
+    private byte[] toStringFormat(R row, DataFormat format) {
+        return isPassThrough ? 
getPassThroughData(row).getBytes(StandardCharsets.UTF_8) : stringify(row, 
format);
     }
 
     protected abstract String getPassThroughData(R row);
 
-    public abstract String stringify(R row, String format);
+    public abstract byte[] stringify(R row, DataFormat format);
 
     public abstract byte[] toArrowFormat(List<R> rows) throws IOException;
 
@@ -288,16 +343,21 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         }
         if (isTwoPhaseCommitEnabled) httpPut.setHeader("two_phase_commit", 
"true");
 
-        switch (format.toLowerCase()) {
-            case "csv":
+        switch (format) {
+            case CSV:
                 // Handling hidden delimiters
                 columnSeparator = 
EscapeHandler.escapeString(properties.getOrDefault("column_separator", "\t"));
-                lineDelimiter = 
EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n"));
+                lineDelimiter = 
EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n")
+                ).getBytes(StandardCharsets.UTF_8);
                 break;
-            case "json":
-                lineDelimiter = properties.getOrDefault("line_delimiter", 
"\n");
+            case JSON:
+                lineDelimiter = properties.getOrDefault("line_delimiter", 
"\n").getBytes(
+                    StandardCharsets.UTF_8);
                 properties.put("read_json_by_line", "true");
                 break;
+            case ARROW:
+                lineDelimiter = null;
+                break;
         }
         properties.forEach(httpPut::setHeader);
     }
@@ -350,7 +410,11 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
     @Override
     public void close() throws IOException {
         createNewBatch = true;
+        isFirstRecordOfBatch = true;
         frontend.close();
+        if (backendHttpClient != null) {
+            backendHttpClient.close();
+        }
         if (executor != null && !executor.isShutdown()) {
             executor.shutdown();
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
index 97ef1c0..d7e7397 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
@@ -16,9 +16,11 @@
 // under the License.
 package org.apache.doris.spark.client.write;
 
+import java.nio.charset.StandardCharsets;
 import org.apache.doris.spark.config.DorisConfig;
 import org.apache.doris.spark.config.DorisOptions;
 import org.apache.doris.spark.exception.OptionRequiredException;
+import org.apache.doris.spark.rest.models.DataFormat;
 import org.apache.doris.spark.util.RowConvertors;
 
 import org.apache.arrow.memory.RootAllocator;
@@ -26,7 +28,6 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.spark.SparkContext;
 import org.apache.spark.TaskContext;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.arrow.ArrowWriter;
@@ -74,12 +75,13 @@ public class StreamLoadProcessor extends 
AbstractStreamLoadProcessor<InternalRow
     }
 
     @Override
-    public String stringify(InternalRow row, String format) {
+    public byte[] stringify(InternalRow row, DataFormat format) {
         switch (format) {
-            case "csv":
-                return RowConvertors.convertToCsv(row, schema, 
columnSeparator);
-            case "json":
-                return RowConvertors.convertToJson(row, schema);
+            case CSV:
+                return RowConvertors.convertToCsv(row, schema, 
columnSeparator).getBytes(
+                    StandardCharsets.UTF_8);
+            case JSON:
+                return RowConvertors.convertToJsonBytes(row, schema);
             default:
                 return null;
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
index 4faab19..79ed053 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
@@ -81,8 +81,10 @@ public class DorisConfig implements Serializable {
             String feNodes = options.get(DorisOptions.DORIS_FENODES.getName());
             if (feNodes.isEmpty()) {
                 throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_FENODES.getName() + "] is empty");
-            } else if (!feNodes.contains(":")) {
-                throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for 
example: host:port[,host2:port]");
+            } else if (feNodes.contains(":")) {
+                if (!feNodes.matches("([\\w-.]+:\\d+,)*([\\w-.]+:\\d+)")) {
+                    throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for 
example: host:port[,host2:port]");
+                }
             }
         }
         if (!ignoreTableCheck) {
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java
new file mode 100644
index 0000000..05d4830
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.rest.models;
+
+public enum DataFormat {
+    CSV,
+    JSON,
+    ARROW
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index 1ae0996..2d8b4d9 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -28,6 +28,7 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.sql.{Date, Timestamp}
 import java.time.{Instant, LocalDate}
+import java.util
 import scala.collection.JavaConverters.mapAsScalaMapConverter
 import scala.collection.mutable
 
@@ -46,11 +47,21 @@ object RowConvertors {
   }
 
   def convertToJson(row: InternalRow, schema: StructType): String = {
-    MAPPER.writeValueAsString(
-      (0 until schema.length).map(i => {
-        schema.fields(i).name -> asScalaValue(row, schema.fields(i).dataType, 
i)
-      }).toMap
-    )
+    val map: util.HashMap[String, Any] = convertRowToMap(row, schema)
+    MAPPER.writeValueAsString(map)
+  }
+
+  private def convertRowToMap(row: InternalRow, schema: StructType) = {
+    val map = new util.HashMap[String, Any](schema.fields.size)
+    (0 until schema.length).foreach(i => {
+      map.put(schema.fields(i).name, asScalaValue(row, 
schema.fields(i).dataType, i))
+    })
+    map
+  }
+
+  def convertToJsonBytes(row: InternalRow, schema: StructType): Array[Byte] = {
+    val map: util.HashMap[String, Any] = convertRowToMap(row, schema)
+    MAPPER.writeValueAsBytes(map)
   }
 
   private def asScalaValue(row: SpecializedGetters, dataType: DataType, 
ordinal: Int): Any = {
@@ -120,7 +131,7 @@ object RowConvertors {
         val keys = map.keys.toArray.map(UTF8String.fromString)
         val values = map.values.toArray.map(UTF8String.fromString)
         ArrayBasedMapData(keys, values)
-      case NullType | BooleanType | ByteType | ShortType | IntegerType | 
LongType | FloatType | DoubleType | BinaryType | _:DecimalType => v
+      case NullType | BooleanType | ByteType | ShortType | IntegerType | 
LongType | FloatType | DoubleType | BinaryType | _: DecimalType => v
       case _ => throw new Exception(s"Unsupported spark type: 
${dataType.typeName}")
     }
   }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
index 3363490..c88bb49 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
@@ -27,39 +27,46 @@ object URLs {
 
   def aliveBackend(feNode: String, enableHttps: Boolean = false) = 
s"${schema(enableHttps)}://$feNode/api/backends?is_alive=true"
 
-  def aliveBackend(host: String, port: Int, enableHttps: Boolean) = 
s"${schema(enableHttps)}://$host:$port/api/backends?is_alive=true"
+  def aliveBackend(host: String, port: Int, enableHttps: Boolean) = 
s"${schema(enableHttps)}://${assemblePath(host, 
port)}/api/backends?is_alive=true"
 
   def tableSchema(feNode: String, database: String, table: String, 
enableHttps: Boolean = false) =
     s"${schema(enableHttps)}://$feNode/api/$database/$table/_schema"
 
   def tableSchema(host: String, port: Int, database: String, table: String, 
enableHttps: Boolean): String =
-    tableSchema(s"$host:$port", database, table, enableHttps)
+    tableSchema(s"${assemblePath(host, port)}", database, table, enableHttps)
 
   def queryPlan(feNode: String, database: String, table: String, enableHttps: 
Boolean = false) =
     s"${schema(enableHttps)}://$feNode/api/$database/$table/_query_plan"
 
   def queryPlan(host: String, port: Int, database: String, table: String, 
enableHttps: Boolean): String =
-    queryPlan(s"$host:$port", database, table, enableHttps)
+    queryPlan(s"${assemblePath(host, port)}", database, table, enableHttps)
 
   def streamLoad(feNode: String, database: String, table: String, enableHttps: 
Boolean = false) =
     s"${schema(enableHttps)}://$feNode/api/$database/$table/_stream_load"
 
   def streamLoad(host: String, port: Int, database: String, table: String, 
enableHttps: Boolean): String =
-    streamLoad(s"$host:$port", database, table, enableHttps)
+    streamLoad(s"${assemblePath(host, port)}", database, table, enableHttps)
 
   def streamLoad2PC(feNode: String, database: String, enableHttps: Boolean = 
false) =
     s"${schema(enableHttps)}://$feNode/api/$database/_stream_load_2pc"
 
   def streamLoad2PC(host: String, port: Int, database: String, enableHttps: 
Boolean): String =
-    streamLoad2PC(s"$host:$port", database, enableHttps)
+    streamLoad2PC(s"${assemblePath(host, port)}", database, enableHttps)
 
   def getFrontEndNodes(host: String, port: Int, enableHttps: Boolean = false) =
-    s"${schema(enableHttps)}://$host:$port/rest/v2/manager/node/frontends"
+    s"${schema(enableHttps)}://${assemblePath(host, 
port)}/rest/v2/manager/node/frontends"
 
   def copyIntoUpload(host: String, port: Int, enableHttps: Boolean = false) =
-    s"${schema(enableHttps)}://$host:$port/copy/upload"
+    s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/upload"
 
   def copyIntoQuery(host: String, port: Int, enableHttps: Boolean = false) =
-    s"${schema(enableHttps)}://$host:$port/copy/query"
-
+    s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/query"
+
+  private def assemblePath(host: String, port: Int): String = {
+    if (port > 0) {
+      s"$host:$port"
+    } else {
+      s"$host"
+    }
+  }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
index a5e756c..c994029 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -17,10 +17,12 @@
 
 package org.apache.doris.spark.sql
 
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.{SparkConf, SparkContext}
-import org.junit.Ignore
-import org.junit.Test
+import org.junit.{Ignore, Test}
+
+import java.util
 
 // This test need real connect info to run.
 // Set the connect info before comment out this @Ignore
@@ -58,18 +60,18 @@ class TestSparkConnector {
       ("zhangsan", "m"),
       ("lisi", "f"),
       ("wangwu", "m")
-    )).toDF("name","gender")
+    )).toDF("name", "gender")
     df.write
       .format("doris")
       .option("doris.fenodes", dorisFeNodes)
       .option("doris.table.identifier", dorisTable)
       .option("user", dorisUser)
       .option("password", dorisPwd)
-//      .option("sink.auto-redirect", "true")
+      //      .option("sink.auto-redirect", "true")
       //specify your field
       .option("doris.write.fields", "name,gender")
-      .option("sink.batch.size",2)
-      .option("sink.max-retries",2)
+      .option("sink.batch.size", 2)
+      .option("sink.max-retries", 2)
       .save()
     session.stop()
   }
@@ -111,11 +113,34 @@ class TestSparkConnector {
       .option("doris.fenodes", dorisFeNodes)
       .option("user", dorisUser)
       .option("password", dorisPwd)
-      .option("sink.batch.size",2)
-      .option("sink.max-retries",2)
+      .option("sink.batch.size", 2)
+      .option("sink.max-retries", 2)
       .start().awaitTermination()
     spark.stop()
   }
 
+  @Test
+  def dorisConfigCheckTest(): Unit = {
+    val map = new util.HashMap[String, String]()
+    map.put(DorisOptions.DORIS_TABLE_IDENTIFIER.getName, "db.table")
+    map.put(DorisOptions.DORIS_USER.getName, "user")
+    map.put(DorisOptions.DORIS_PASSWORD.getName, "pwd")
+    for (url <- List("fe1:8080,fe2:8120", "nginx/test")) {
+      map.put(DorisOptions.DORIS_FENODES.getName, url)
+      DorisConfig.fromMap(map, false)
+    }
+
+    for (url <- List("fe1:8080,fe2:8120,", "fe1:8080;fe2:8120")) {
+      var exRaised = false
+      try {
+        map.put(DorisOptions.DORIS_FENODES.getName, url)
+        DorisConfig.fromMap(map, false)
+      } catch {
+        case _:IllegalArgumentException =>
+          exRaised = true
+      }
+      assert(exRaised)
+    }
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to