This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0d2ef93 [Enhancement](connector) enhance the FE nodes port check and
improve write performace (#303)
0d2ef93 is described below
commit 0d2ef93c96c15fe8ce2c9a12cc50eb1c92554581
Author: LeiWang <[email protected]>
AuthorDate: Mon Mar 31 22:30:20 2025 +0800
[Enhancement](connector) enhance the FE nodes port check and improve write
performace (#303)
---
.../doris/spark/client/DorisFrontendClient.java | 4 +-
.../client/write/AbstractStreamLoadProcessor.java | 186 ++++++++++++++-------
.../spark/client/write/StreamLoadProcessor.java | 14 +-
.../org/apache/doris/spark/config/DorisConfig.java | 6 +-
.../apache/doris/spark/rest/models/DataFormat.java | 24 +++
.../apache/doris/spark/util/RowConvertors.scala | 23 ++-
.../scala/org/apache/doris/spark/util/URLs.scala | 25 ++-
.../doris/spark/sql/TestSparkConnector.scala | 41 ++++-
8 files changed, 229 insertions(+), 94 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index 1c4c49c..3298be0 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -101,7 +101,7 @@ public class DorisFrontendClient implements Serializable {
for (String frontendNode : frontendNodeArray) {
String[] nodeDetails = frontendNode.split(":");
try {
- List<Frontend> list = Collections.singletonList(new
Frontend(nodeDetails[0], Integer.parseInt(nodeDetails[1])));
+ List<Frontend> list = Collections.singletonList(new
Frontend(nodeDetails[0], nodeDetails.length > 1 ?
Integer.parseInt(nodeDetails[1]) : -1));
frontendList = requestFrontends(list, (frontend, client)
-> {
HttpGet httpGet = new
HttpGet(URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(),
isHttpsEnabled));
HttpUtils.setAuth(httpGet, username, password);
@@ -142,7 +142,7 @@ public class DorisFrontendClient implements Serializable {
return Arrays.stream(frontendNodeArray)
.map(node -> {
String[] nodeParts = node.split(":");
- return new Frontend(nodeParts[0],
Integer.parseInt(nodeParts[1]), queryPort, flightSqlPort);
+ return new Frontend(nodeParts[0], nodeParts.length > 1
? Integer.parseInt(nodeParts[1]) : -1, queryPort, flightSqlPort);
})
.collect(Collectors.toList());
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 37d3a48..169f5a8 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -17,6 +17,12 @@
package org.apache.doris.spark.client.write;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import org.apache.doris.spark.client.DorisBackendHttpClient;
import org.apache.doris.spark.client.DorisFrontendClient;
import org.apache.doris.spark.client.entity.Backend;
@@ -25,12 +31,11 @@ import org.apache.doris.spark.config.DorisConfig;
import org.apache.doris.spark.config.DorisOptions;
import org.apache.doris.spark.exception.OptionRequiredException;
import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.models.DataFormat;
import org.apache.doris.spark.util.EscapeHandler;
import org.apache.doris.spark.util.HttpUtils;
import org.apache.doris.spark.util.URLs;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
@@ -46,8 +51,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
@@ -87,11 +90,11 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
private final Map<String, String> properties;
- private final String format;
+ private final DataFormat format;
protected String columnSeparator;
- private String lineDelimiter;
+ private byte[] lineDelimiter;
private final boolean isGzipCompressionEnabled;
@@ -128,7 +131,7 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
this.properties = config.getSinkProperties();
// init stream load props
this.isTwoPhaseCommitEnabled =
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC);
- this.format = properties.getOrDefault("format", "csv");
+ this.format = DataFormat.valueOf(properties.getOrDefault("format",
"csv").toUpperCase());
this.isGzipCompressionEnabled =
properties.containsKey("compress_type") &&
"gzip".equals(properties.get("compress_type"));
if (properties.containsKey(GROUP_COMMIT)) {
String message = "";
@@ -154,6 +157,11 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
}
createNewBatch = false;
}
+ if (isFirstRecordOfBatch) {
+ isFirstRecordOfBatch = false;
+ } else if (lineDelimiter != null){
+ output.write(lineDelimiter);
+ }
output.write(toFormat(row, format));
currentBatchCount++;
}
@@ -162,8 +170,9 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
public String stop() throws Exception {
if (requestFuture != null) {
createNewBatch = true;
+ isFirstRecordOfBatch = true;
// arrow format need to send all buffer data before stop
- if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
+ if (!recordBuffer.isEmpty() && DataFormat.ARROW.equals(format)) {
List<R> rs = new LinkedList<>(recordBuffer);
recordBuffer.clear();
output.write(toArrowFormat(rs));
@@ -187,66 +196,115 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
return null;
}
+ private void execCommitReq(String host, int port, String msg,
CloseableHttpClient httpClient) {
+ HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database,
isHttpsEnabled));
+ try {
+ handleCommitHeaders(httpPut, msg);
+ } catch (OptionRequiredException e) {
+ throw new RuntimeException("stream load handle commit props
failed", e);
+ }
+ try {
+ CloseableHttpResponse response = httpClient.execute(httpPut);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg + ", status: "
+ + response.getStatusLine().getStatusCode() + ",
reason: " + response.getStatusLine()
+ .getReasonPhrase());
+ } else {
+ String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
+ if(!checkTransResponse(resEntity)) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg + ", resp: " + resEntity);
+ } else {
+ this.logger.info("commit: {} response: {}", msg,
resEntity);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
+ }
+ }
+
@Override
public void commit(String msg) throws Exception {
if (isTwoPhaseCommitEnabled) {
logger.info("begin to commit transaction {}", msg);
- frontend.requestFrontends((frontEnd, httpClient) -> {
- HttpPut httpPut = new
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(),
database, isHttpsEnabled));
- try {
- handleCommitHeaders(httpPut, msg);
- } catch (OptionRequiredException e) {
- throw new RuntimeException("stream load handle commit
props failed", e);
+ if (autoRedirect) {
+ frontend.requestFrontends((frontEnd, httpClient) -> {
+ execCommitReq(frontEnd.getHost(), frontEnd.getHttpPort(),
msg, httpClient);
+ return null;
+ });
+ } else {
+ backendHttpClient.executeReq((backend, httpClient) -> {
+ execCommitReq(backend.getHost(), backend.getHttpPort(),
msg, httpClient);
+ return null;
+ });
+ }
+ logger.info("success to commit transaction {}", msg);
+ }
+ }
+
+ private void execAbortReq(String host, int port, String msg,
CloseableHttpClient httpClient) {
+ HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database,
isHttpsEnabled));
+ try {
+ handleAbortHeaders(httpPut, msg);
+ } catch (OptionRequiredException e) {
+ throw new RuntimeException("stream load handle abort props
failed", e);
+ }
+ try {
+ CloseableHttpResponse response = httpClient.execute(httpPut);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new RuntimeException("abort transaction failed,
transaction: " + msg + ", status: "
+ + response.getStatusLine().getStatusCode() + ",
reason: " + response.getStatusLine()
+ .getReasonPhrase());
+ } else {
+ String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
+ if(!checkTransResponse(resEntity)) {
+ throw new RuntimeException("abort transaction failed,
transaction: " + msg + ", resp: " + resEntity);
+ } else {
+ this.logger.info("abort: {} response: {}", msg, resEntity);
}
- try(CloseableHttpResponse response =
httpClient.execute(httpPut)){
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg
- + ", status: " +
response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- logger.info("commit response: {}",
EntityUtils.toString(response.getEntity()));
- } catch (IOException e) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("abort transaction failed, transaction:
" + msg, e);
+ }
+ }
+
+ private boolean checkTransResponse(String resEntity) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ String status =
objectMapper.readTree(resEntity).get("status").asText();
+ if ("Success".equalsIgnoreCase(status)) {
+ return true;
}
- return null;
- });
- logger.info("success to commit transaction {}", msg);
+ } catch (JsonProcessingException e) {
+ logger.warn("invalid json response: " + resEntity, e);
}
+ return false;
}
@Override
public void abort(String msg) throws Exception {
if (isTwoPhaseCommitEnabled) {
logger.info("begin to abort transaction {}", msg);
- frontend.requestFrontends((frontEnd, httpClient) -> {
- HttpPut httpPut = new
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(),
database, isHttpsEnabled));
- try {
- handleAbortHeaders(httpPut, msg);
- } catch (OptionRequiredException e) {
- throw new RuntimeException("stream load handle abort props
failed", e);
- }
- try(CloseableHttpResponse response =
httpClient.execute(httpPut)){
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("abort transaction failed,
transaction: " + msg
- + ", status: " +
response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- logger.info("abort response: {}",
EntityUtils.toString(response.getEntity()));
- } catch (IOException e) {
- throw new RuntimeException("abort transaction failed,
transaction: " + msg, e);
- }
- return null; // Returning null as the callback does not return
anything
- });
+ if (autoRedirect) {
+ frontend.requestFrontends((frontEnd, httpClient) -> {
+ execAbortReq(frontEnd.getHost(), frontEnd.getHttpPort(),
msg, httpClient);
+ return null; // Returning null as the callback does not
return anything
+ });
+ } else {
+ backendHttpClient.executeReq((backend, httpClient) -> {
+ execAbortReq(backend.getHost(), backend.getHttpPort(),
msg, httpClient);
+ return null; // Returning null as the callback does not
return anything
+ });
+ }
logger.info("success to abort transaction {}", msg);
}
}
- private byte[] toFormat(R row, String format) throws IOException {
- switch (format.toLowerCase()) {
- case "csv":
- case "json":
+ private byte[] toFormat(R row, DataFormat format) throws IOException {
+ switch (format) {
+ case CSV:
+ case JSON:
return toStringFormat(row, format);
- case "arrow":
+ case ARROW:
recordBuffer.add(copy(row));
if (recordBuffer.size() < arrowBufferSize) {
return new byte[0];
@@ -260,16 +318,13 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
}
}
- private byte[] toStringFormat(R row, String format) {
- String prefix = isFirstRecordOfBatch ? "" : lineDelimiter;
- isFirstRecordOfBatch = false;
- String stringRow = isPassThrough ? getPassThroughData(row) :
stringify(row, format);
- return (prefix + stringRow).getBytes(StandardCharsets.UTF_8);
+ private byte[] toStringFormat(R row, DataFormat format) {
+ return isPassThrough ?
getPassThroughData(row).getBytes(StandardCharsets.UTF_8) : stringify(row,
format);
}
protected abstract String getPassThroughData(R row);
- public abstract String stringify(R row, String format);
+ public abstract byte[] stringify(R row, DataFormat format);
public abstract byte[] toArrowFormat(List<R> rows) throws IOException;
@@ -288,16 +343,21 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
}
if (isTwoPhaseCommitEnabled) httpPut.setHeader("two_phase_commit",
"true");
- switch (format.toLowerCase()) {
- case "csv":
+ switch (format) {
+ case CSV:
// Handling hidden delimiters
columnSeparator =
EscapeHandler.escapeString(properties.getOrDefault("column_separator", "\t"));
- lineDelimiter =
EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n"));
+ lineDelimiter =
EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n")
+ ).getBytes(StandardCharsets.UTF_8);
break;
- case "json":
- lineDelimiter = properties.getOrDefault("line_delimiter",
"\n");
+ case JSON:
+ lineDelimiter = properties.getOrDefault("line_delimiter",
"\n").getBytes(
+ StandardCharsets.UTF_8);
properties.put("read_json_by_line", "true");
break;
+ case ARROW:
+ lineDelimiter = null;
+ break;
}
properties.forEach(httpPut::setHeader);
}
@@ -350,7 +410,11 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
@Override
public void close() throws IOException {
createNewBatch = true;
+ isFirstRecordOfBatch = true;
frontend.close();
+ if (backendHttpClient != null) {
+ backendHttpClient.close();
+ }
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
index 97ef1c0..d7e7397 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
@@ -16,9 +16,11 @@
// under the License.
package org.apache.doris.spark.client.write;
+import java.nio.charset.StandardCharsets;
import org.apache.doris.spark.config.DorisConfig;
import org.apache.doris.spark.config.DorisOptions;
import org.apache.doris.spark.exception.OptionRequiredException;
+import org.apache.doris.spark.rest.models.DataFormat;
import org.apache.doris.spark.util.RowConvertors;
import org.apache.arrow.memory.RootAllocator;
@@ -26,7 +28,6 @@ import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
@@ -74,12 +75,13 @@ public class StreamLoadProcessor extends
AbstractStreamLoadProcessor<InternalRow
}
@Override
- public String stringify(InternalRow row, String format) {
+ public byte[] stringify(InternalRow row, DataFormat format) {
switch (format) {
- case "csv":
- return RowConvertors.convertToCsv(row, schema,
columnSeparator);
- case "json":
- return RowConvertors.convertToJson(row, schema);
+ case CSV:
+ return RowConvertors.convertToCsv(row, schema,
columnSeparator).getBytes(
+ StandardCharsets.UTF_8);
+ case JSON:
+ return RowConvertors.convertToJsonBytes(row, schema);
default:
return null;
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
index 4faab19..79ed053 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
@@ -81,8 +81,10 @@ public class DorisConfig implements Serializable {
String feNodes = options.get(DorisOptions.DORIS_FENODES.getName());
if (feNodes.isEmpty()) {
throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_FENODES.getName() + "] is empty");
- } else if (!feNodes.contains(":")) {
- throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for
example: host:port[,host2:port]");
+ } else if (feNodes.contains(":")) {
+ if (!feNodes.matches("([\\w-.]+:\\d+,)*([\\w-.]+:\\d+)")) {
+ throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for
example: host:port[,host2:port]");
+ }
}
}
if (!ignoreTableCheck) {
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java
new file mode 100644
index 0000000..05d4830
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.rest.models;
+
+public enum DataFormat {
+ CSV,
+ JSON,
+ ARROW
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index 1ae0996..2d8b4d9 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -28,6 +28,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
+import java.util
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable
@@ -46,11 +47,21 @@ object RowConvertors {
}
def convertToJson(row: InternalRow, schema: StructType): String = {
- MAPPER.writeValueAsString(
- (0 until schema.length).map(i => {
- schema.fields(i).name -> asScalaValue(row, schema.fields(i).dataType,
i)
- }).toMap
- )
+ val map: util.HashMap[String, Any] = convertRowToMap(row, schema)
+ MAPPER.writeValueAsString(map)
+ }
+
+ private def convertRowToMap(row: InternalRow, schema: StructType) = {
+ val map = new util.HashMap[String, Any](schema.fields.size)
+ (0 until schema.length).foreach(i => {
+ map.put(schema.fields(i).name, asScalaValue(row,
schema.fields(i).dataType, i))
+ })
+ map
+ }
+
+ def convertToJsonBytes(row: InternalRow, schema: StructType): Array[Byte] = {
+ val map: util.HashMap[String, Any] = convertRowToMap(row, schema)
+ MAPPER.writeValueAsBytes(map)
}
private def asScalaValue(row: SpecializedGetters, dataType: DataType,
ordinal: Int): Any = {
@@ -120,7 +131,7 @@ object RowConvertors {
val keys = map.keys.toArray.map(UTF8String.fromString)
val values = map.values.toArray.map(UTF8String.fromString)
ArrayBasedMapData(keys, values)
- case NullType | BooleanType | ByteType | ShortType | IntegerType |
LongType | FloatType | DoubleType | BinaryType | _:DecimalType => v
+ case NullType | BooleanType | ByteType | ShortType | IntegerType |
LongType | FloatType | DoubleType | BinaryType | _: DecimalType => v
case _ => throw new Exception(s"Unsupported spark type:
${dataType.typeName}")
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
index 3363490..c88bb49 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
@@ -27,39 +27,46 @@ object URLs {
def aliveBackend(feNode: String, enableHttps: Boolean = false) =
s"${schema(enableHttps)}://$feNode/api/backends?is_alive=true"
- def aliveBackend(host: String, port: Int, enableHttps: Boolean) =
s"${schema(enableHttps)}://$host:$port/api/backends?is_alive=true"
+ def aliveBackend(host: String, port: Int, enableHttps: Boolean) =
s"${schema(enableHttps)}://${assemblePath(host,
port)}/api/backends?is_alive=true"
def tableSchema(feNode: String, database: String, table: String,
enableHttps: Boolean = false) =
s"${schema(enableHttps)}://$feNode/api/$database/$table/_schema"
def tableSchema(host: String, port: Int, database: String, table: String,
enableHttps: Boolean): String =
- tableSchema(s"$host:$port", database, table, enableHttps)
+ tableSchema(s"${assemblePath(host, port)}", database, table, enableHttps)
def queryPlan(feNode: String, database: String, table: String, enableHttps:
Boolean = false) =
s"${schema(enableHttps)}://$feNode/api/$database/$table/_query_plan"
def queryPlan(host: String, port: Int, database: String, table: String,
enableHttps: Boolean): String =
- queryPlan(s"$host:$port", database, table, enableHttps)
+ queryPlan(s"${assemblePath(host, port)}", database, table, enableHttps)
def streamLoad(feNode: String, database: String, table: String, enableHttps:
Boolean = false) =
s"${schema(enableHttps)}://$feNode/api/$database/$table/_stream_load"
def streamLoad(host: String, port: Int, database: String, table: String,
enableHttps: Boolean): String =
- streamLoad(s"$host:$port", database, table, enableHttps)
+ streamLoad(s"${assemblePath(host, port)}", database, table, enableHttps)
def streamLoad2PC(feNode: String, database: String, enableHttps: Boolean =
false) =
s"${schema(enableHttps)}://$feNode/api/$database/_stream_load_2pc"
def streamLoad2PC(host: String, port: Int, database: String, enableHttps:
Boolean): String =
- streamLoad2PC(s"$host:$port", database, enableHttps)
+ streamLoad2PC(s"${assemblePath(host, port)}", database, enableHttps)
def getFrontEndNodes(host: String, port: Int, enableHttps: Boolean = false) =
- s"${schema(enableHttps)}://$host:$port/rest/v2/manager/node/frontends"
+ s"${schema(enableHttps)}://${assemblePath(host,
port)}/rest/v2/manager/node/frontends"
def copyIntoUpload(host: String, port: Int, enableHttps: Boolean = false) =
- s"${schema(enableHttps)}://$host:$port/copy/upload"
+ s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/upload"
def copyIntoQuery(host: String, port: Int, enableHttps: Boolean = false) =
- s"${schema(enableHttps)}://$host:$port/copy/query"
-
+ s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/query"
+
+ private def assemblePath(host: String, port: Int): String = {
+ if (port > 0) {
+ s"$host:$port"
+ } else {
+ s"$host"
+ }
+ }
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
index a5e756c..c994029 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -17,10 +17,12 @@
package org.apache.doris.spark.sql
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
-import org.junit.Ignore
-import org.junit.Test
+import org.junit.{Ignore, Test}
+
+import java.util
// This test need real connect info to run.
// Set the connect info before comment out this @Ignore
@@ -58,18 +60,18 @@ class TestSparkConnector {
("zhangsan", "m"),
("lisi", "f"),
("wangwu", "m")
- )).toDF("name","gender")
+ )).toDF("name", "gender")
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
.option("doris.table.identifier", dorisTable)
.option("user", dorisUser)
.option("password", dorisPwd)
-// .option("sink.auto-redirect", "true")
+ // .option("sink.auto-redirect", "true")
//specify your field
.option("doris.write.fields", "name,gender")
- .option("sink.batch.size",2)
- .option("sink.max-retries",2)
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
.save()
session.stop()
}
@@ -111,11 +113,34 @@ class TestSparkConnector {
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
- .option("sink.batch.size",2)
- .option("sink.max-retries",2)
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
.start().awaitTermination()
spark.stop()
}
+ @Test
+ def dorisConfigCheckTest(): Unit = {
+ val map = new util.HashMap[String, String]()
+ map.put(DorisOptions.DORIS_TABLE_IDENTIFIER.getName, "db.table")
+ map.put(DorisOptions.DORIS_USER.getName, "user")
+ map.put(DorisOptions.DORIS_PASSWORD.getName, "pwd")
+ for (url <- List("fe1:8080,fe2:8120", "nginx/test")) {
+ map.put(DorisOptions.DORIS_FENODES.getName, url)
+ DorisConfig.fromMap(map, false)
+ }
+
+ for (url <- List("fe1:8080,fe2:8120,", "fe1:8080;fe2:8120")) {
+ var exRaised = false
+ try {
+ map.put(DorisOptions.DORIS_FENODES.getName, url)
+ DorisConfig.fromMap(map, false)
+ } catch {
+ case _:IllegalArgumentException =>
+ exRaised = true
+ }
+ assert(exRaised)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]