This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 0d2ef93 [Enhancement](connector) enhance the FE nodes port check and improve write performace (#303) 0d2ef93 is described below commit 0d2ef93c96c15fe8ce2c9a12cc50eb1c92554581 Author: LeiWang <leihz1...@gmail.com> AuthorDate: Mon Mar 31 22:30:20 2025 +0800 [Enhancement](connector) enhance the FE nodes port check and improve write performace (#303) --- .../doris/spark/client/DorisFrontendClient.java | 4 +- .../client/write/AbstractStreamLoadProcessor.java | 186 ++++++++++++++------- .../spark/client/write/StreamLoadProcessor.java | 14 +- .../org/apache/doris/spark/config/DorisConfig.java | 6 +- .../apache/doris/spark/rest/models/DataFormat.java | 24 +++ .../apache/doris/spark/util/RowConvertors.scala | 23 ++- .../scala/org/apache/doris/spark/util/URLs.scala | 25 ++- .../doris/spark/sql/TestSparkConnector.scala | 41 ++++- 8 files changed, 229 insertions(+), 94 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java index 1c4c49c..3298be0 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java @@ -101,7 +101,7 @@ public class DorisFrontendClient implements Serializable { for (String frontendNode : frontendNodeArray) { String[] nodeDetails = frontendNode.split(":"); try { - List<Frontend> list = Collections.singletonList(new Frontend(nodeDetails[0], Integer.parseInt(nodeDetails[1]))); + List<Frontend> list = Collections.singletonList(new Frontend(nodeDetails[0], nodeDetails.length > 1 ? Integer.parseInt(nodeDetails[1]) : -1)); frontendList = requestFrontends(list, (frontend, client) -> { HttpGet httpGet = new HttpGet(URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(), isHttpsEnabled)); HttpUtils.setAuth(httpGet, username, password); @@ -142,7 +142,7 @@ public class DorisFrontendClient implements Serializable { return Arrays.stream(frontendNodeArray) .map(node -> { String[] nodeParts = node.split(":"); - return new Frontend(nodeParts[0], Integer.parseInt(nodeParts[1]), queryPort, flightSqlPort); + return new Frontend(nodeParts[0], nodeParts.length > 1 ? Integer.parseInt(nodeParts[1]) : -1, queryPort, flightSqlPort); }) .collect(Collectors.toList()); } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java index 37d3a48..169f5a8 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java @@ -17,6 +17,12 @@ package org.apache.doris.spark.client.write; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import org.apache.doris.spark.client.DorisBackendHttpClient; import org.apache.doris.spark.client.DorisFrontendClient; import org.apache.doris.spark.client.entity.Backend; @@ -25,12 +31,11 @@ import org.apache.doris.spark.config.DorisConfig; import org.apache.doris.spark.config.DorisOptions; import org.apache.doris.spark.exception.OptionRequiredException; import org.apache.doris.spark.exception.StreamLoadException; +import org.apache.doris.spark.rest.models.DataFormat; import org.apache.doris.spark.util.EscapeHandler; import org.apache.doris.spark.util.HttpUtils; import org.apache.doris.spark.util.URLs; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.json.JsonMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; @@ -46,8 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; @@ -87,11 +90,11 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl private final Map<String, String> properties; - private final String format; + private final DataFormat format; protected String columnSeparator; - private String lineDelimiter; + private byte[] lineDelimiter; private final boolean isGzipCompressionEnabled; @@ -128,7 +131,7 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl this.properties = config.getSinkProperties(); // init stream load props this.isTwoPhaseCommitEnabled = config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC); - this.format = properties.getOrDefault("format", "csv"); + this.format = DataFormat.valueOf(properties.getOrDefault("format", "csv").toUpperCase()); this.isGzipCompressionEnabled = properties.containsKey("compress_type") && "gzip".equals(properties.get("compress_type")); if (properties.containsKey(GROUP_COMMIT)) { String message = ""; @@ -154,6 +157,11 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl } createNewBatch = false; } + if (isFirstRecordOfBatch) { + isFirstRecordOfBatch = false; + } else if (lineDelimiter != null){ + output.write(lineDelimiter); + } output.write(toFormat(row, format)); currentBatchCount++; } @@ -162,8 +170,9 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl public String stop() throws Exception { if (requestFuture != null) { createNewBatch = true; + isFirstRecordOfBatch = true; // arrow format need to send all buffer data before stop - if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) { + if (!recordBuffer.isEmpty() && DataFormat.ARROW.equals(format)) { List<R> rs = new LinkedList<>(recordBuffer); recordBuffer.clear(); output.write(toArrowFormat(rs)); @@ -187,66 +196,115 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl return null; } + private void execCommitReq(String host, int port, String msg, CloseableHttpClient httpClient) { + HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, isHttpsEnabled)); + try { + handleCommitHeaders(httpPut, msg); + } catch (OptionRequiredException e) { + throw new RuntimeException("stream load handle commit props failed", e); + } + try { + CloseableHttpResponse response = httpClient.execute(httpPut); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new RuntimeException("commit transaction failed, transaction: " + msg + ", status: " + + response.getStatusLine().getStatusCode() + ", reason: " + response.getStatusLine() + .getReasonPhrase()); + } else { + String resEntity = EntityUtils.toString(new BufferedHttpEntity(response.getEntity())); + if(!checkTransResponse(resEntity)) { + throw new RuntimeException("commit transaction failed, transaction: " + msg + ", resp: " + resEntity); + } else { + this.logger.info("commit: {} response: {}", msg, resEntity); + } + } + } catch (IOException e) { + throw new RuntimeException("commit transaction failed, transaction: " + msg, e); + } + } + @Override public void commit(String msg) throws Exception { if (isTwoPhaseCommitEnabled) { logger.info("begin to commit transaction {}", msg); - frontend.requestFrontends((frontEnd, httpClient) -> { - HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), database, isHttpsEnabled)); - try { - handleCommitHeaders(httpPut, msg); - } catch (OptionRequiredException e) { - throw new RuntimeException("stream load handle commit props failed", e); + if (autoRedirect) { + frontend.requestFrontends((frontEnd, httpClient) -> { + execCommitReq(frontEnd.getHost(), frontEnd.getHttpPort(), msg, httpClient); + return null; + }); + } else { + backendHttpClient.executeReq((backend, httpClient) -> { + execCommitReq(backend.getHost(), backend.getHttpPort(), msg, httpClient); + return null; + }); + } + logger.info("success to commit transaction {}", msg); + } + } + + private void execAbortReq(String host, int port, String msg, CloseableHttpClient httpClient) { + HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database, isHttpsEnabled)); + try { + handleAbortHeaders(httpPut, msg); + } catch (OptionRequiredException e) { + throw new RuntimeException("stream load handle abort props failed", e); + } + try { + CloseableHttpResponse response = httpClient.execute(httpPut); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new RuntimeException("abort transaction failed, transaction: " + msg + ", status: " + + response.getStatusLine().getStatusCode() + ", reason: " + response.getStatusLine() + .getReasonPhrase()); + } else { + String resEntity = EntityUtils.toString(new BufferedHttpEntity(response.getEntity())); + if(!checkTransResponse(resEntity)) { + throw new RuntimeException("abort transaction failed, transaction: " + msg + ", resp: " + resEntity); + } else { + this.logger.info("abort: {} response: {}", msg, resEntity); } - try(CloseableHttpResponse response = httpClient.execute(httpPut)){ - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new RuntimeException("commit transaction failed, transaction: " + msg - + ", status: " + response.getStatusLine().getStatusCode() - + ", reason: " + response.getStatusLine().getReasonPhrase()); - } - logger.info("commit response: {}", EntityUtils.toString(response.getEntity())); - } catch (IOException e) { - throw new RuntimeException("commit transaction failed, transaction: " + msg, e); + } + } catch (IOException e) { + throw new RuntimeException("abort transaction failed, transaction: " + msg, e); + } + } + + private boolean checkTransResponse(String resEntity) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + String status = objectMapper.readTree(resEntity).get("status").asText(); + if ("Success".equalsIgnoreCase(status)) { + return true; } - return null; - }); - logger.info("success to commit transaction {}", msg); + } catch (JsonProcessingException e) { + logger.warn("invalid json response: " + resEntity, e); } + return false; } @Override public void abort(String msg) throws Exception { if (isTwoPhaseCommitEnabled) { logger.info("begin to abort transaction {}", msg); - frontend.requestFrontends((frontEnd, httpClient) -> { - HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(), database, isHttpsEnabled)); - try { - handleAbortHeaders(httpPut, msg); - } catch (OptionRequiredException e) { - throw new RuntimeException("stream load handle abort props failed", e); - } - try(CloseableHttpResponse response = httpClient.execute(httpPut)){ - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw new RuntimeException("abort transaction failed, transaction: " + msg - + ", status: " + response.getStatusLine().getStatusCode() - + ", reason: " + response.getStatusLine().getReasonPhrase()); - } - logger.info("abort response: {}", EntityUtils.toString(response.getEntity())); - } catch (IOException e) { - throw new RuntimeException("abort transaction failed, transaction: " + msg, e); - } - return null; // Returning null as the callback does not return anything - }); + if (autoRedirect) { + frontend.requestFrontends((frontEnd, httpClient) -> { + execAbortReq(frontEnd.getHost(), frontEnd.getHttpPort(), msg, httpClient); + return null; // Returning null as the callback does not return anything + }); + } else { + backendHttpClient.executeReq((backend, httpClient) -> { + execAbortReq(backend.getHost(), backend.getHttpPort(), msg, httpClient); + return null; // Returning null as the callback does not return anything + }); + } logger.info("success to abort transaction {}", msg); } } - private byte[] toFormat(R row, String format) throws IOException { - switch (format.toLowerCase()) { - case "csv": - case "json": + private byte[] toFormat(R row, DataFormat format) throws IOException { + switch (format) { + case CSV: + case JSON: return toStringFormat(row, format); - case "arrow": + case ARROW: recordBuffer.add(copy(row)); if (recordBuffer.size() < arrowBufferSize) { return new byte[0]; @@ -260,16 +318,13 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl } } - private byte[] toStringFormat(R row, String format) { - String prefix = isFirstRecordOfBatch ? "" : lineDelimiter; - isFirstRecordOfBatch = false; - String stringRow = isPassThrough ? getPassThroughData(row) : stringify(row, format); - return (prefix + stringRow).getBytes(StandardCharsets.UTF_8); + private byte[] toStringFormat(R row, DataFormat format) { + return isPassThrough ? getPassThroughData(row).getBytes(StandardCharsets.UTF_8) : stringify(row, format); } protected abstract String getPassThroughData(R row); - public abstract String stringify(R row, String format); + public abstract byte[] stringify(R row, DataFormat format); public abstract byte[] toArrowFormat(List<R> rows) throws IOException; @@ -288,16 +343,21 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl } if (isTwoPhaseCommitEnabled) httpPut.setHeader("two_phase_commit", "true"); - switch (format.toLowerCase()) { - case "csv": + switch (format) { + case CSV: // Handling hidden delimiters columnSeparator = EscapeHandler.escapeString(properties.getOrDefault("column_separator", "\t")); - lineDelimiter = EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n")); + lineDelimiter = EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n") + ).getBytes(StandardCharsets.UTF_8); break; - case "json": - lineDelimiter = properties.getOrDefault("line_delimiter", "\n"); + case JSON: + lineDelimiter = properties.getOrDefault("line_delimiter", "\n").getBytes( + StandardCharsets.UTF_8); properties.put("read_json_by_line", "true"); break; + case ARROW: + lineDelimiter = null; + break; } properties.forEach(httpPut::setHeader); } @@ -350,7 +410,11 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl @Override public void close() throws IOException { createNewBatch = true; + isFirstRecordOfBatch = true; frontend.close(); + if (backendHttpClient != null) { + backendHttpClient.close(); + } if (executor != null && !executor.isShutdown()) { executor.shutdown(); } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java index 97ef1c0..d7e7397 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java @@ -16,9 +16,11 @@ // under the License. package org.apache.doris.spark.client.write; +import java.nio.charset.StandardCharsets; import org.apache.doris.spark.config.DorisConfig; import org.apache.doris.spark.config.DorisOptions; import org.apache.doris.spark.exception.OptionRequiredException; +import org.apache.doris.spark.rest.models.DataFormat; import org.apache.doris.spark.util.RowConvertors; import org.apache.arrow.memory.RootAllocator; @@ -26,7 +28,6 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.spark.SparkContext; import org.apache.spark.TaskContext; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.arrow.ArrowWriter; @@ -74,12 +75,13 @@ public class StreamLoadProcessor extends AbstractStreamLoadProcessor<InternalRow } @Override - public String stringify(InternalRow row, String format) { + public byte[] stringify(InternalRow row, DataFormat format) { switch (format) { - case "csv": - return RowConvertors.convertToCsv(row, schema, columnSeparator); - case "json": - return RowConvertors.convertToJson(row, schema); + case CSV: + return RowConvertors.convertToCsv(row, schema, columnSeparator).getBytes( + StandardCharsets.UTF_8); + case JSON: + return RowConvertors.convertToJsonBytes(row, schema); default: return null; } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java index 4faab19..79ed053 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java @@ -81,8 +81,10 @@ public class DorisConfig implements Serializable { String feNodes = options.get(DorisOptions.DORIS_FENODES.getName()); if (feNodes.isEmpty()) { throw new IllegalArgumentException("option [" + DorisOptions.DORIS_FENODES.getName() + "] is empty"); - } else if (!feNodes.contains(":")) { - throw new IllegalArgumentException("option [" + DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for example: host:port[,host2:port]"); + } else if (feNodes.contains(":")) { + if (!feNodes.matches("([\\w-.]+:\\d+,)*([\\w-.]+:\\d+)")) { + throw new IllegalArgumentException("option [" + DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for example: host:port[,host2:port]"); + } } } if (!ignoreTableCheck) { diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java new file mode 100644 index 0000000..05d4830 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataFormat.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.rest.models; + +public enum DataFormat { + CSV, + JSON, + ARROW +} diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala index 1ae0996..2d8b4d9 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala @@ -28,6 +28,7 @@ import org.apache.spark.unsafe.types.UTF8String import java.sql.{Date, Timestamp} import java.time.{Instant, LocalDate} +import java.util import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.collection.mutable @@ -46,11 +47,21 @@ object RowConvertors { } def convertToJson(row: InternalRow, schema: StructType): String = { - MAPPER.writeValueAsString( - (0 until schema.length).map(i => { - schema.fields(i).name -> asScalaValue(row, schema.fields(i).dataType, i) - }).toMap - ) + val map: util.HashMap[String, Any] = convertRowToMap(row, schema) + MAPPER.writeValueAsString(map) + } + + private def convertRowToMap(row: InternalRow, schema: StructType) = { + val map = new util.HashMap[String, Any](schema.fields.size) + (0 until schema.length).foreach(i => { + map.put(schema.fields(i).name, asScalaValue(row, schema.fields(i).dataType, i)) + }) + map + } + + def convertToJsonBytes(row: InternalRow, schema: StructType): Array[Byte] = { + val map: util.HashMap[String, Any] = convertRowToMap(row, schema) + MAPPER.writeValueAsBytes(map) } private def asScalaValue(row: SpecializedGetters, dataType: DataType, ordinal: Int): Any = { @@ -120,7 +131,7 @@ object RowConvertors { val keys = map.keys.toArray.map(UTF8String.fromString) val values = map.values.toArray.map(UTF8String.fromString) ArrayBasedMapData(keys, values) - case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType | _:DecimalType => v + case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType | _: DecimalType => v case _ => throw new Exception(s"Unsupported spark type: ${dataType.typeName}") } } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala index 3363490..c88bb49 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala @@ -27,39 +27,46 @@ object URLs { def aliveBackend(feNode: String, enableHttps: Boolean = false) = s"${schema(enableHttps)}://$feNode/api/backends?is_alive=true" - def aliveBackend(host: String, port: Int, enableHttps: Boolean) = s"${schema(enableHttps)}://$host:$port/api/backends?is_alive=true" + def aliveBackend(host: String, port: Int, enableHttps: Boolean) = s"${schema(enableHttps)}://${assemblePath(host, port)}/api/backends?is_alive=true" def tableSchema(feNode: String, database: String, table: String, enableHttps: Boolean = false) = s"${schema(enableHttps)}://$feNode/api/$database/$table/_schema" def tableSchema(host: String, port: Int, database: String, table: String, enableHttps: Boolean): String = - tableSchema(s"$host:$port", database, table, enableHttps) + tableSchema(s"${assemblePath(host, port)}", database, table, enableHttps) def queryPlan(feNode: String, database: String, table: String, enableHttps: Boolean = false) = s"${schema(enableHttps)}://$feNode/api/$database/$table/_query_plan" def queryPlan(host: String, port: Int, database: String, table: String, enableHttps: Boolean): String = - queryPlan(s"$host:$port", database, table, enableHttps) + queryPlan(s"${assemblePath(host, port)}", database, table, enableHttps) def streamLoad(feNode: String, database: String, table: String, enableHttps: Boolean = false) = s"${schema(enableHttps)}://$feNode/api/$database/$table/_stream_load" def streamLoad(host: String, port: Int, database: String, table: String, enableHttps: Boolean): String = - streamLoad(s"$host:$port", database, table, enableHttps) + streamLoad(s"${assemblePath(host, port)}", database, table, enableHttps) def streamLoad2PC(feNode: String, database: String, enableHttps: Boolean = false) = s"${schema(enableHttps)}://$feNode/api/$database/_stream_load_2pc" def streamLoad2PC(host: String, port: Int, database: String, enableHttps: Boolean): String = - streamLoad2PC(s"$host:$port", database, enableHttps) + streamLoad2PC(s"${assemblePath(host, port)}", database, enableHttps) def getFrontEndNodes(host: String, port: Int, enableHttps: Boolean = false) = - s"${schema(enableHttps)}://$host:$port/rest/v2/manager/node/frontends" + s"${schema(enableHttps)}://${assemblePath(host, port)}/rest/v2/manager/node/frontends" def copyIntoUpload(host: String, port: Int, enableHttps: Boolean = false) = - s"${schema(enableHttps)}://$host:$port/copy/upload" + s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/upload" def copyIntoQuery(host: String, port: Int, enableHttps: Boolean = false) = - s"${schema(enableHttps)}://$host:$port/copy/query" - + s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/query" + + private def assemblePath(host: String, port: Int): String = { + if (port > 0) { + s"$host:$port" + } else { + s"$host" + } + } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala index a5e756c..c994029 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -17,10 +17,12 @@ package org.apache.doris.spark.sql +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} -import org.junit.Ignore -import org.junit.Test +import org.junit.{Ignore, Test} + +import java.util // This test need real connect info to run. // Set the connect info before comment out this @Ignore @@ -58,18 +60,18 @@ class TestSparkConnector { ("zhangsan", "m"), ("lisi", "f"), ("wangwu", "m") - )).toDF("name","gender") + )).toDF("name", "gender") df.write .format("doris") .option("doris.fenodes", dorisFeNodes) .option("doris.table.identifier", dorisTable) .option("user", dorisUser) .option("password", dorisPwd) -// .option("sink.auto-redirect", "true") + // .option("sink.auto-redirect", "true") //specify your field .option("doris.write.fields", "name,gender") - .option("sink.batch.size",2) - .option("sink.max-retries",2) + .option("sink.batch.size", 2) + .option("sink.max-retries", 2) .save() session.stop() } @@ -111,11 +113,34 @@ class TestSparkConnector { .option("doris.fenodes", dorisFeNodes) .option("user", dorisUser) .option("password", dorisPwd) - .option("sink.batch.size",2) - .option("sink.max-retries",2) + .option("sink.batch.size", 2) + .option("sink.max-retries", 2) .start().awaitTermination() spark.stop() } + @Test + def dorisConfigCheckTest(): Unit = { + val map = new util.HashMap[String, String]() + map.put(DorisOptions.DORIS_TABLE_IDENTIFIER.getName, "db.table") + map.put(DorisOptions.DORIS_USER.getName, "user") + map.put(DorisOptions.DORIS_PASSWORD.getName, "pwd") + for (url <- List("fe1:8080,fe2:8120", "nginx/test")) { + map.put(DorisOptions.DORIS_FENODES.getName, url) + DorisConfig.fromMap(map, false) + } + + for (url <- List("fe1:8080,fe2:8120,", "fe1:8080;fe2:8120")) { + var exRaised = false + try { + map.put(DorisOptions.DORIS_FENODES.getName, url) + DorisConfig.fromMap(map, false) + } catch { + case _:IllegalArgumentException => + exRaised = true + } + assert(exRaised) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org