This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5f07e8f [improve] add doris sink itcase (#297) 5f07e8f is described below commit 5f07e8fca0598e9fc38720c2da4d0816440c0e3c Author: wudi <676366...@qq.com> AuthorDate: Mon Mar 31 09:57:26 2025 +0800 [improve] add doris sink itcase (#297) --- .github/workflows/run-e2ecase.yml | 52 +++++ .../client/write/AbstractStreamLoadProcessor.java | 40 ++-- .../spark/client/write/StreamLoadProcessor.java | 10 +- .../apache/doris/spark/config/DorisOptions.java | 2 +- .../org/apache/doris/spark/util/EscapeHandler.java | 41 ++++ .../org/apache/doris/spark/util/HttpUtils.scala | 9 +- .../apache/doris/spark/util/RowConvertors.scala | 3 +- .../spark/container/AbstractContainerTestBase.java | 63 ++++++ .../spark/container/instance/ContainerService.java | 2 + .../spark/container/instance/DorisContainer.java | 5 + .../container/instance/DorisCustomerContainer.java | 5 + .../doris/spark/sql/Doris2DorisE2ECase.scala | 110 ++++++++++ .../doris/spark/sql/DorisCatalogITCase.scala | 137 +++++++++++++ .../apache/doris/spark/sql/DorisReaderITCase.scala | 5 +- .../spark/sql/DorisWriterFailoverITCase.scala | 220 ++++++++++++++++++++ .../apache/doris/spark/sql/DorisWriterITCase.scala | 222 +++++++++++++++++++-- .../resources/container/ddl/write_all_type.sql | 31 +++ .../{log4j.properties => log4j2-test.properties} | 11 +- .../apache/doris/spark/write/DorisDataWriter.scala | 5 +- .../org/apache/doris/spark/write/DorisWrite.scala | 2 - 20 files changed, 918 insertions(+), 57 deletions(-) diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml new file mode 100644 index 0000000..2f2949b --- /dev/null +++ b/.github/workflows/run-e2ecase.yml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Run E2ECases +on: + pull_request: + push: + +jobs: + build-extension: + name: "Run E2ECases" + runs-on: ubuntu-latest + defaults: + run: + shell: bash + steps: + - name: Checkout + uses: actions/checkout@master + + - name: Setup java + uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: '8' + + - name: Run E2ECases for spark 2 + run: | + cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + + - name: Run E2ECases for spark 3.1 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + + - name: Run E2ECases for spark 3.3 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java index 2a10ffa..37d3a48 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java @@ -17,8 +17,6 @@ package org.apache.doris.spark.client.write; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.json.JsonMapper; import org.apache.doris.spark.client.DorisBackendHttpClient; import org.apache.doris.spark.client.DorisFrontendClient; import org.apache.doris.spark.client.entity.Backend; @@ -27,8 +25,12 @@ import org.apache.doris.spark.config.DorisConfig; import org.apache.doris.spark.config.DorisOptions; import org.apache.doris.spark.exception.OptionRequiredException; import org.apache.doris.spark.exception.StreamLoadException; +import org.apache.doris.spark.util.EscapeHandler; import org.apache.doris.spark.util.HttpUtils; import org.apache.doris.spark.util.URLs; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; @@ -110,6 +112,7 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl private transient ExecutorService executor; private Future<CloseableHttpResponse> requestFuture = null; + private volatile String currentLabel; public AbstractStreamLoadProcessor(DorisConfig config) throws Exception { super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)); @@ -129,7 +132,7 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl this.isGzipCompressionEnabled = properties.containsKey("compress_type") && "gzip".equals(properties.get("compress_type")); if (properties.containsKey(GROUP_COMMIT)) { String message = ""; - if (!isTwoPhaseCommitEnabled) message = "group commit does not support two-phase commit"; + if (isTwoPhaseCommitEnabled) message = "group commit does not support two-phase commit"; if (properties.containsKey(PARTIAL_COLUMNS) && "true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) message = "group commit does not support partial column updates"; if (!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) @@ -166,6 +169,7 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl output.write(toArrowFormat(rs)); } output.close(); + logger.info("stream load stopped with {}", currentLabel != null ? currentLabel : "group commit"); CloseableHttpResponse res = requestFuture.get(); if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { throw new StreamLoadException("stream load execute failed, status: " + res.getStatusLine().getStatusCode() @@ -194,13 +198,13 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl } catch (OptionRequiredException e) { throw new RuntimeException("stream load handle commit props failed", e); } - try { - CloseableHttpResponse response = httpClient.execute(httpPut); + try(CloseableHttpResponse response = httpClient.execute(httpPut)){ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { throw new RuntimeException("commit transaction failed, transaction: " + msg + ", status: " + response.getStatusLine().getStatusCode() + ", reason: " + response.getStatusLine().getReasonPhrase()); } + logger.info("commit response: {}", EntityUtils.toString(response.getEntity())); } catch (IOException e) { throw new RuntimeException("commit transaction failed, transaction: " + msg, e); } @@ -221,13 +225,13 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl } catch (OptionRequiredException e) { throw new RuntimeException("stream load handle abort props failed", e); } - try { - CloseableHttpResponse response = httpClient.execute(httpPut); + try(CloseableHttpResponse response = httpClient.execute(httpPut)){ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { throw new RuntimeException("abort transaction failed, transaction: " + msg + ", status: " + response.getStatusLine().getStatusCode() + ", reason: " + response.getStatusLine().getReasonPhrase()); } + logger.info("abort response: {}", EntityUtils.toString(response.getEntity())); } catch (IOException e) { throw new RuntimeException("abort transaction failed, transaction: " + msg, e); } @@ -274,8 +278,8 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl private void handleStreamLoadProperties(HttpPut httpPut) throws OptionRequiredException { addCommonHeaders(httpPut); if (groupCommit == null || groupCommit.equals("off_mode")) { - String label = generateStreamLoadLabel(); - httpPut.setHeader("label", label); + currentLabel = generateStreamLoadLabel(); + httpPut.setHeader("label", currentLabel); } String writeFields = getWriteFields(); httpPut.setHeader("columns", writeFields); @@ -286,20 +290,12 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl switch (format.toLowerCase()) { case "csv": - if (!properties.containsKey("column_separator")) { - properties.put("column_separator", "\t"); - } - columnSeparator = properties.get("column_separator"); - if (!properties.containsKey("line_delimiter")) { - properties.put("line_delimiter", "\n"); - } - lineDelimiter = properties.get("line_delimiter"); + // Handling hidden delimiters + columnSeparator = EscapeHandler.escapeString(properties.getOrDefault("column_separator", "\t")); + lineDelimiter = EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n")); break; case "json": - if (!properties.containsKey("line_delimiter")) { - properties.put("line_delimiter", "\n"); - } - lineDelimiter = properties.get("line_delimiter"); + lineDelimiter = properties.getOrDefault("line_delimiter", "\n"); properties.put("read_json_by_line", "true"); break; } @@ -346,6 +342,8 @@ public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> impl entity = new GzipCompressingEntity(entity); } httpPut.setEntity(entity); + + logger.info("table {}.{} stream load started for {} on host {}:{}", database, table, currentLabel != null ? currentLabel : "group commit", host, port); return getExecutors().submit(() -> client.execute(httpPut)); } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java index 2f787a5..97ef1c0 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java @@ -16,15 +16,17 @@ // under the License. package org.apache.doris.spark.client.write; +import org.apache.doris.spark.config.DorisConfig; +import org.apache.doris.spark.config.DorisOptions; +import org.apache.doris.spark.exception.OptionRequiredException; +import org.apache.doris.spark.util.RowConvertors; + import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.doris.spark.config.DorisConfig; -import org.apache.doris.spark.config.DorisOptions; -import org.apache.doris.spark.exception.OptionRequiredException; -import org.apache.doris.spark.util.RowConvertors; +import org.apache.spark.SparkContext; import org.apache.spark.TaskContext; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.arrow.ArrowWriter; diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java index 5b666f4..4319688 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java @@ -81,7 +81,7 @@ public class DorisOptions { */ public static final ConfigOption<Boolean> DORIS_SINK_TASK_USE_REPARTITION = ConfigOptions.name("doris.sink.task.use.repartition").booleanType().defaultValue(false).withDescription(""); - public static final ConfigOption<Integer> DORIS_SINK_BATCH_INTERVAL_MS = ConfigOptions.name("doris.sink.batch.interval.ms").intType().defaultValue(50).withDescription(""); + public static final ConfigOption<Integer> DORIS_SINK_BATCH_INTERVAL_MS = ConfigOptions.name("doris.sink.batch.interval.ms").intType().defaultValue(0).withDescription(""); public static final ConfigOption<Boolean> DORIS_SINK_ENABLE_2PC = ConfigOptions.name("doris.sink.enable-2pc").booleanType().defaultValue(false).withDescription(""); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java new file mode 100644 index 0000000..436658d --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Handler for escape in properties. */ +public class EscapeHandler { + public static final String ESCAPE_DELIMITERS_FLAGS = "\\x"; + public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})"); + + public static String escapeString(String source) { + if (source.contains(ESCAPE_DELIMITERS_FLAGS)) { + Matcher m = ESCAPE_PATTERN.matcher(source); + StringBuffer buf = new StringBuffer(); + while (m.find()) { + m.appendReplacement( + buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16))); + } + m.appendTail(buf); + return buf.toString(); + } + return source; + } +} diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala index 0a11c2e..90031f2 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala @@ -22,6 +22,7 @@ import org.apache.http.HttpHeaders import org.apache.http.client.methods.HttpRequestBase import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy} import org.apache.http.impl.client.{CloseableHttpClient, DefaultRedirectStrategy, HttpClients} +import org.apache.http.protocol.HttpRequestExecutor import org.apache.http.ssl.SSLContexts import java.io.{File, FileInputStream} @@ -33,9 +34,11 @@ import scala.util.{Failure, Success, Try} object HttpUtils { def getHttpClient(config: DorisConfig): CloseableHttpClient = { - val builder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy { - override def isRedirectable(method: String): Boolean = true - }) + val builder = HttpClients.custom() + .setRequestExecutor(new HttpRequestExecutor(60000)) + .setRedirectStrategy(new DefaultRedirectStrategy { + override def isRedirectable(method: String): Boolean = true + }) val enableHttps = config.getValue(DorisOptions.DORIS_ENABLE_HTTPS) if (enableHttps) { require(config.contains(DorisOptions.DORIS_HTTPS_KEY_STORE_PATH)) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala index b75d1ce..1ae0996 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala @@ -40,7 +40,8 @@ object RowConvertors { def convertToCsv(row: InternalRow, schema: StructType, sep: String): String = { (0 until schema.length).map(i => { - asScalaValue(row, schema.fields(i).dataType, i) + val value = asScalaValue(row, schema.fields(i).dataType, i) + if (value == null) NULL_VALUE else value }).mkString(sep) } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java index 97e7e26..c9b9768 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java @@ -21,6 +21,13 @@ import org.apache.doris.spark.container.instance.ContainerService; import org.apache.doris.spark.container.instance.DorisContainer; import org.apache.doris.spark.container.instance.DorisCustomerContainer; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -28,6 +35,8 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.util.List; import java.util.Objects; @@ -79,6 +88,10 @@ public abstract class AbstractContainerTestBase { return dorisContainerService.getPassword(); } + protected int getQueryPort() { + return dorisContainerService.getQueryPort(); + } + protected String getDorisQueryUrl() { return dorisContainerService.getJdbcUrl(); } @@ -115,4 +128,54 @@ public abstract class AbstractContainerTestBase { assertEquals(expected.size(), actual.size()); assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new Object[0])); } + + protected void faultInjectionOpen() throws IOException { + String pointName = "FlushToken.submit_flush_error"; + String apiUrl = + String.format( + "http://%s/api/debug_point/add/%s", + dorisContainerService.getBenodes(), pointName); + HttpPost httpPost = new HttpPost(apiUrl); + httpPost.addHeader( + HttpHeaders.AUTHORIZATION, + auth(dorisContainerService.getUsername(), dorisContainerService.getPassword())); + try (CloseableHttpClient httpClient = HttpClients.custom().build()) { + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); + if (statusCode == 200 && response.getEntity() != null) { + LOG.info("Debug point response {}", EntityUtils.toString(response.getEntity())); + } else { + LOG.info("Debug point failed, statusCode: {}, reason: {}", statusCode, reason); + } + } + } + } + + protected void faultInjectionClear() throws IOException { + String apiUrl = + String.format( + "http://%s/api/debug_point/clear", dorisContainerService.getBenodes()); + HttpPost httpPost = new HttpPost(apiUrl); + httpPost.addHeader( + HttpHeaders.AUTHORIZATION, + auth(dorisContainerService.getUsername(), dorisContainerService.getPassword())); + try (CloseableHttpClient httpClient = HttpClients.custom().build()) { + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + String reason = response.getStatusLine().toString(); + if (statusCode == 200 && response.getEntity() != null) { + LOG.info("Debug point response {}", EntityUtils.toString(response.getEntity())); + } else { + LOG.info("Debug point failed, statusCode: {}, reason: {}", statusCode, reason); + } + } + } + } + + protected String auth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encoded); + } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java index 3ec7ee5..f8ec293 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java @@ -49,4 +49,6 @@ public interface ContainerService { String getBenodes(); void close(); + + int getQueryPort(); } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java index 7c9297e..5220876 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java @@ -193,6 +193,11 @@ public class DorisContainer implements ContainerService { LOG.info("Doris container closed successfully."); } + @Override + public int getQueryPort() { + return 9030; + } + private void initializeJDBCDriver() throws MalformedURLException { URLClassLoader urlClassLoader = new URLClassLoader( diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java index 4ba4e74..4f64754 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java @@ -135,4 +135,9 @@ public class DorisCustomerContainer implements ContainerService { @Override public void close() {} + + @Override + public int getQueryPort() { + return Integer.valueOf(System.getProperty("doris_query_port")); + } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala new file mode 100644 index 0000000..032195d --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.spark.sql.SparkSession +import org.junit.{Before, Test} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.slf4j.LoggerFactory + +import java.util + +object Doris2DorisE2ECase { + @Parameterized.Parameters(name = "readMode: {0}, flightSqlPort: {1}") + def parameters(): java.util.Collection[Array[AnyRef]] = { + import java.util.Arrays + Arrays.asList( + Array("thrift": java.lang.String, -1: java.lang.Integer), + Array("arrow": java.lang.String, 9611: java.lang.Integer) + ) + } +} + +/** + * Read Doris to Write Doris. + */ +@RunWith(classOf[Parameterized]) +class Doris2DorisE2ECase(readMode: String, flightSqlPort: Int) extends AbstractContainerTestBase{ + + private val LOG = LoggerFactory.getLogger(classOf[Doris2DorisE2ECase]) + val DATABASE = "test_doris_e2e" + val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types" + val TABLE_WRITE_TBL_ALL_TYPES = "tbl_write_tbl_all_types" + + @Before + def setUp(): Unit = { + ContainerUtils.executeSQLStatement(getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) + } + + @Test + def testAllTypeE2ESQL(): Unit = { + val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) + + val targetInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/write_all_type.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, targetInitSql: _*) + + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}" + |) + |""".stripMargin) + + session.sql( + """ + |insert into test_sink select * from test_source + |""".stripMargin) + session.stop() + + val excepted = + util.Arrays.asList( + "1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello, Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", \"key2\":\"value2\"},{\"name\": \"Tom\", \"age\": 30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}", + "2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", \"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\": 25},{\"status\":\"ok\"},{\"data\":[1,2,3]}", + "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\": \"Alice\", \"age\": 40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}", + "4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null"); + + val query = String.format("select * from %s order by id", TABLE_WRITE_TBL_ALL_TYPES) + ContainerUtils.checkResult(getDorisQueryConnection(DATABASE), LOG, excepted, query, 20, false) + } +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala new file mode 100644 index 0000000..39df05a --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder, getDorisQueryConnection} +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.junit.Test +import org.slf4j.LoggerFactory + +import java.util +import scala.collection.JavaConverters._ + +/** + * it case for doris catalog. + */ +class DorisCatalogITCase extends AbstractContainerTestBase { + + private val LOG = LoggerFactory.getLogger(classOf[DorisCatalogITCase]) + private val DATABASE = "test_catalog" + private val TBL_CATALOG = "tbl_catalog" + + @Test + @throws[Exception] + def testSparkCatalog(): Unit = { + + val conf = new SparkConf() + conf.set("spark.sql.catalog.doris_catalog", "org.apache.doris.spark.catalog.DorisTableCatalog") + conf.set("spark.sql.catalog.doris_catalog.doris.fenodes", getFenodes) + conf.set("spark.sql.catalog.doris_catalog.doris.query.port", getQueryPort.toString) + conf.set("spark.sql.catalog.doris_catalog.doris.user", getDorisUsername) + conf.set("spark.sql.catalog.doris_catalog.doris.password", getDorisPassword) + val session = SparkSession.builder().config(conf).master("local[*]").getOrCreate() + + // session.sessionState.catalogManager.setCurrentCatalog("doris_catalog") + // spark 2 no catalogManager property, used reflect + try { + val stateObj = session.sessionState + val catalogManagerObj = stateObj.getClass.getMethod("catalogManager").invoke(stateObj) + val setCurrentCatalogMethod = catalogManagerObj.getClass.getMethod("setCurrentCatalog", classOf[String]) + setCurrentCatalogMethod.invoke(catalogManagerObj, "doris_catalog") + } catch { + case e: Exception => + // if Spark 2,will throw NoSuchMethodException + println("Catalog API not available, skipping catalog operations") + e.printStackTrace() + return + } + + // show databases + val showDatabaseActual = new util.ArrayList[String](session.sql("show databases").collect().map(_.getAs[String]("namespace")).toList.asJava) + showDatabaseActual.add("information_schema") + val showDatabaseExcept = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("show databases"), + 1) + checkResultInAnyOrder("testSparkCatalog", showDatabaseExcept.toArray, showDatabaseActual.toArray) + + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) + + // mock data + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TBL_CATALOG), + String.format("CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") " + + " DUPLICATE KEY(`name`) " + + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (" + + "\"replication_num\" = \"1\")", DATABASE, TBL_CATALOG), + String.format("insert into %s.%s values ('doris',18)", DATABASE, TBL_CATALOG), + String.format("insert into %s.%s values ('spark',10)", DATABASE, TBL_CATALOG) + ) + + // show tables + session.sql("USE " + DATABASE); + val showTablesActual = session.sql("show tables").collect().map(_.getAs[String]("tableName")).toList.asJava + val showTablesExcept = ContainerUtils.executeSQLStatement( + getDorisQueryConnection(DATABASE), + LOG, + String.format("show tables"), + 1) + checkResultInAnyOrder("testSparkCatalog", showTablesExcept.toArray, showTablesActual.toArray) + + val query = String.format("select * from %s.%s", DATABASE, TBL_CATALOG) + // select tables + val selectActual = session.sql(query).collect().map(i=> i.getAs[String]("name") + "," + i.getAs[Int]("age")).toList.asJava + val selectExcept = ContainerUtils.executeSQLStatement( + getDorisQueryConnection(DATABASE), + LOG, + query, + 2) + checkResultInAnyOrder("testSparkCatalog", selectExcept.toArray, selectActual.toArray) + + session.sql(String.format("desc %s",TBL_CATALOG)).show(true); + // insert tables + // todo: insert into values('') schema does not match + session.sql(String.format("insert overwrite %s.%s select 'insert-data' as name, 99 as age", DATABASE, TBL_CATALOG)) + val selectNewExcept = ContainerUtils.executeSQLStatement( + getDorisQueryConnection(DATABASE), + LOG, + query, + 2) + checkResultInAnyOrder("testSparkCatalog", selectNewExcept.toArray, util.Arrays.asList("insert-data,99").toArray) + } + + + private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], actual: Array[AnyRef]): Unit = { + LOG.info("Checking DorisCatalogITCase result. testName={}, actual={}, expected={}", testName, actual, expected) + assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava) + } + +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala index 67a0688..07c998b 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala @@ -42,6 +42,9 @@ object DorisReaderITCase { } } +/** + * it case for doris reader. + */ @RunWith(classOf[Parameterized]) class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractContainerTestBase { @@ -121,7 +124,7 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo | "user"="${getDorisUsername}", | "password"="${getDorisPassword}", | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + | "doris.fe.auto.fetch"="true" |) |""".stripMargin) diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala new file mode 100644 index 0000000..bbaf7bd --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder, getDorisQueryConnection} +import org.apache.doris.spark.rest.models.DataModel +import org.apache.spark.sql.SparkSession +import org.junit.{Before, Test} +import org.slf4j.LoggerFactory + +import java.util +import java.util.UUID +import java.util.concurrent.{Executors, TimeUnit} +import scala.util.control.Breaks._ +import scala.collection.JavaConverters._ + +/** + * Test DorisWriter failover. + */ +class DorisWriterFailoverITCase extends AbstractContainerTestBase { + + private val LOG = LoggerFactory.getLogger(classOf[DorisWriterFailoverITCase]) + val DATABASE = "test_doris_failover" + val TABLE_WRITE_TBL_RETRY = "tbl_write_tbl_retry" + val TABLE_WRITE_TBL_TASK_RETRY = "tbl_write_tbl_task_retry" + val TABLE_WRITE_TBL_PRECOMMIT_FAIL = "tbl_write_tbl_precommit_fail" + val TABLE_WRITE_TBL_COMMIT_FAIL = "tbl_write_tbl_commit_fail" + + @Before + def setUp(): Unit = { + ContainerUtils.executeSQLStatement(getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) + } + + @Test + def testFailoverForRetry(): Unit = { + initializeTable(TABLE_WRITE_TBL_RETRY, DataModel.DUPLICATE) + val session = SparkSession.builder().master("local[1]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris", "1234"), + ("spark", "123456"), + ("catalog", "12345678") + )).toDF("name", "address") + df.createTempView("mock_source") + + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_RETRY}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.sink.batch.interval.ms"="1000", + | "doris.sink.batch.size"="1", + | "doris.sink.max-retries"="100", + | "doris.sink.enable-2pc"="false" + |) + |""".stripMargin) + + val service = Executors.newSingleThreadExecutor() + val future = service.submit(new Runnable { + override def run(): Unit = { + session.sql("INSERT INTO test_sink SELECT * FROM mock_source") + } + }) + + val query = String.format("SELECT * FROM %s.%s", DATABASE, TABLE_WRITE_TBL_RETRY) + var result: util.List[String] = null + val connection = getDorisQueryConnection(DATABASE) + breakable { + while (true) { + try { + // query may be failed + result = ContainerUtils.executeSQLStatement(connection, LOG, query, 2) + } catch { + case ex: Exception => + LOG.error("Failed to query result, cause " + ex.getMessage) + } + + // until insert 1 rows + if (result.size >= 1){ + Thread.sleep(5000) + ContainerUtils.executeSQLStatement( + connection, + LOG, + String.format("ALTER TABLE %s.%s MODIFY COLUMN address varchar(256)", DATABASE, TABLE_WRITE_TBL_RETRY)) + break + } + } + } + + future.get(60, TimeUnit.SECONDS) + session.stop() + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_WRITE_TBL_RETRY), + 2) + val expected = util.Arrays.asList("doris,1234", "spark,123456", "catalog,12345678"); + checkResultInAnyOrder("testFailoverForRetry", expected.toArray, actual.toArray) + } + + + /** + * Test failover for task retry and sink.max-retries=0 + */ + @Test + def testFailoverForTaskRetry(): Unit = { + initializeTable(TABLE_WRITE_TBL_TASK_RETRY, DataModel.DUPLICATE) + val session = SparkSession.builder().master("local[1,100]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris", "cn"), + ("spark", "us"), + ("catalog", "uk") + )).toDF("name", "address") + df.createTempView("mock_source") + + var uuid = UUID.randomUUID().toString + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_TASK_RETRY}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.sink.batch.size"="1", + | "doris.sink.batch.interval.ms"="1000", + | "doris.sink.max-retries"="0", + | "doris.sink.enable-2pc"="true", + | "doris.sink.label.prefix"='${uuid}' + |) + |""".stripMargin) + + val service = Executors.newSingleThreadExecutor() + val future = service.submit(new Runnable { + override def run(): Unit = { + session.sql("INSERT INTO test_sink SELECT * FROM mock_source") + } + }) + + val query = "show transaction from " + DATABASE + " where label like '" + uuid + "%'" + var result: List[String] = null + val connection = getDorisQueryConnection(DATABASE) + breakable { + while (true) { + try { + // query may be failed + result = ContainerUtils.executeSQLStatement(connection, LOG, query, 15).asScala.toList + } catch { + case ex: Exception => + LOG.error("Failed to query result, cause " + ex.getMessage) + } + + // until insert 1 rows + if (result.size >= 1 && result.forall(s => s.contains("PRECOMMITTED"))){ + faultInjectionOpen() + Thread.sleep(3000) + faultInjectionClear() + break + } + } + } + + future.get(60, TimeUnit.SECONDS) + session.stop() + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_WRITE_TBL_TASK_RETRY), + 2) + val expected = util.Arrays.asList("doris,cn", "spark,us", "catalog,uk"); + checkResultInAnyOrder("testFailoverForTaskRetry", expected.toArray, actual.toArray) + } + + + private def initializeTable(table: String, dataModel: DataModel): Unit = { + val max = if (DataModel.AGGREGATE == dataModel) "MAX" else "" + val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else ",\"enable_unique_key_merge_on_write\" = \"false\"" + val model = if (dataModel == DataModel.UNIQUE_MOR) DataModel.UNIQUE.toString else dataModel.toString + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format("CREATE TABLE %s.%s ( \n" + + "`name` varchar(32),\n" + + "`address` varchar(4) %s\n" + + ") " + + " %s KEY(`name`) " + + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (" + + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, max, model)) + } + + private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], actual: Array[AnyRef]): Unit = { + LOG.info("Checking DorisWriterFailoverITCase result. testName={}, actual={}, expected={}", testName, actual, expected) + assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava) + } +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala index 7f1e393..51201e4 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala @@ -19,26 +19,36 @@ package org.apache.doris.spark.sql import org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder, getDorisQueryConnection} import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.doris.spark.rest.models.DataModel import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.Test import org.slf4j.LoggerFactory import java.util import scala.collection.JavaConverters._ + +/** + * it case for doris writer. + */ class DorisWriterITCase extends AbstractContainerTestBase { - private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase]) + private val LOG = LoggerFactory.getLogger(classOf[DorisWriterITCase]) - val DATABASE: String = "test" + val DATABASE: String = "test_doris_write" val TABLE_CSV: String = "tbl_csv" + val TABLE_CSV_HIDE_SEP: String = "tbl_csv_hide_sep" + val TABLE_GROUP_COMMIT: String = "tbl_group_commit" val TABLE_JSON: String = "tbl_json" + val TABLE_JSON_EMPTY_PARTITION: String = "tbl_json_empty_partition" val TABLE_JSON_TBL: String = "tbl_json_tbl" + val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite" + val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow" @Test @throws[Exception] def testSinkCsvFormat(): Unit = { - initializeTable(TABLE_CSV) - val session = SparkSession.builder().master("local[*]").getOrCreate() + initializeTable(TABLE_CSV, DataModel.DUPLICATE) + val session = SparkSession.builder().master("local[1]").getOrCreate() val df = session.createDataFrame(Seq( ("doris_csv", 1), ("spark_csv", 2) @@ -46,17 +56,20 @@ class DorisWriterITCase extends AbstractContainerTestBase { df.write .format("doris") .option("doris.fenodes", getFenodes) + .option("doris.sink.auto-redirect", false) .option("doris.table.identifier", DATABASE + "." + TABLE_CSV) .option("user", getDorisUsername) .option("password", getDorisPassword) .option("sink.properties.column_separator", ",") .option("sink.properties.line_delimiter", "\n") .option("sink.properties.format", "csv") + .option("doris.sink.batch.interval.ms", "5000") + .option("doris.sink.batch.size", "1") .mode(SaveMode.Append) .save() session.stop() - Thread.sleep(10000) + Thread.sleep(15000) val actual = ContainerUtils.executeSQLStatement( getDorisQueryConnection, LOG, @@ -66,10 +79,136 @@ class DorisWriterITCase extends AbstractContainerTestBase { checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), actual.toArray) } + @Test + @throws[Exception] + def testSinkCsvFormatHideSep(): Unit = { + initializeTable(TABLE_CSV_HIDE_SEP, DataModel.AGGREGATE) + val session = SparkSession.builder().master("local[*]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris_csv", 1), + ("spark_csv", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes + "," + getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.column_separator", "\\x01") + .option("sink.properties.line_delimiter", "\\x02") + .option("sink.properties.format", "csv") + .mode(SaveMode.Append) + .save() + session.stop() + + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP), + 2) + val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") + checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(), actual.toArray) + } + + @Test + @throws[Exception] + def testSinkGroupCommit(): Unit = { + initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE) + val session = SparkSession.builder().master("local[*]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris_csv", 1), + ("spark_csv", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.group_commit", "sync_mode") + .mode(SaveMode.Append) + .save() + session.stop() + + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT), + 2) + val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") + checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(), actual.toArray) + } + + @Test + @throws[Exception] + def testSinkEmptyPartition(): Unit = { + initializeTable(TABLE_JSON_EMPTY_PARTITION, DataModel.AGGREGATE) + val session = SparkSession.builder().master("local[2]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris_json", 1) + )).toDF("name", "age") + df.repartition(2).write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_EMPTY_PARTITION) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.read_json_by_line", "true") + .option("sink.properties.format", "json") + .option("doris.sink.auto-redirect", "false") + .option("doris.sink.enable-2pc", "true") + .mode(SaveMode.Append) + .save() + session.stop() + + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_EMPTY_PARTITION), + 2) + val expected = util.Arrays.asList("doris_json,1"); + checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray, actual.toArray) + } + + @Test + @throws[Exception] + def testSinkArrowFormat(): Unit = { + initializeTable(TABLE_JSON_TBL_ARROW, DataModel.DUPLICATE) + val session = SparkSession.builder().master("local[*]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris_json", 1), + ("spark_json", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_TBL_ARROW) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.format", "arrow") + .option("doris.sink.batch.size", "1") + .option("doris.sink.enable-2pc", "true") + .mode(SaveMode.Append) + .save() + session.stop() + + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW), + 2) + val expected = util.Arrays.asList("doris_json,1", "spark_json,2"); + checkResultInAnyOrder("testSinkArrowFormat", expected.toArray, actual.toArray) + } + @Test @throws[Exception] def testSinkJsonFormat(): Unit = { - initializeTable(TABLE_JSON) + initializeTable(TABLE_JSON, DataModel.UNIQUE) val session = SparkSession.builder().master("local[*]").getOrCreate() val df = session.createDataFrame(Seq( ("doris_json", 1), @@ -101,7 +240,7 @@ class DorisWriterITCase extends AbstractContainerTestBase { @Test @throws[Exception] def testSQLSinkFormat(): Unit = { - initializeTable(TABLE_JSON_TBL) + initializeTable(TABLE_JSON_TBL, DataModel.UNIQUE_MOR) val session = SparkSession.builder().master("local[*]").getOrCreate() val df = session.createDataFrame(Seq( ("doris_tbl", 1), @@ -135,25 +274,74 @@ class DorisWriterITCase extends AbstractContainerTestBase { checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, actual.toArray) } - + @Test @throws[Exception] - private def initializeTable(table: String): Unit = { + def testSQLSinkOverwrite(): Unit = { + initializeTable(TABLE_JSON_TBL_OVERWRITE, DataModel.DUPLICATE) + // init history data + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("insert into %s.%s values ('history-doris',1118)", DATABASE, TABLE_JSON_TBL_OVERWRITE), + String.format("insert into %s.%s values ('history-spark',1110)", DATABASE, TABLE_JSON_TBL_OVERWRITE)) + + val session = SparkSession.builder().master("local[*]").getOrCreate() + val df = session.createDataFrame(Seq( + ("doris_tbl", 1), + ("spark_tbl", 2) + )).toDF("name", "age") + df.createTempView("mock_source") + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.query.port"="${getQueryPort}", + | "doris.sink.label.prefix"="doris-label-customer", + | "doris.sink.enable-2pc"="true" + |) + |""".stripMargin) + session.sql( + """ + |insert overwrite table test_sink select name,age from mock_source + |""".stripMargin) + session.stop() + + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_OVERWRITE), + 2) + val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2"); + checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray, actual.toArray) + } + + private def initializeTable(table: String, dataModel: DataModel): Unit = { + val max = if (DataModel.AGGREGATE == dataModel) "MAX" else "" + val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else ",\"enable_unique_key_merge_on_write\" = \"false\"" + val model = if (dataModel == DataModel.UNIQUE_MOR) DataModel.UNIQUE.toString else dataModel.toString ContainerUtils.executeSQLStatement( getDorisQueryConnection, LOG, String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), - String.format( - "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" + ") " + - "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + - "PROPERTIES (\n" + - "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table) - ) + String.format("CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int %s\n" + + ") " + + " %s KEY(`name`) " + + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + "PROPERTIES (" + + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, max, model)) } private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], actual: Array[AnyRef]): Unit = { - LOG.info("Checking DorisSourceITCase result. testName={}, actual={}, expected={}", testName, actual, expected) + LOG.info("Checking DorisWriterFailoverITCase result. testName={}, actual={}, expected={}", testName, actual, expected) assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava) } - } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql new file mode 100644 index 0000000..967bdf5 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql @@ -0,0 +1,31 @@ +DROP TABLE IF EXISTS tbl_write_tbl_all_types; + +CREATE TABLE tbl_write_tbl_all_types ( +`id` int, +`c1` boolean, +`c2` tinyint, +`c3` smallint, +`c4` int, +`c5` bigint, +`c6` largeint, +`c7` float, +`c8` double, +`c9` decimal(12,4), +`c10` date, +`c11` datetime, +`c12` char(1), +`c13` varchar(256), +`c14` string, +`c15` Array<String>, +`c16` Map<String, String>, +`c17` Struct<name: String, age: int>, +`c18` JSON, +`c19` VARIANT +) +DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 2 +PROPERTIES ( +"replication_num" = "1", +"light_schema_change" = "true" +); + diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties similarity index 76% rename from spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties rename to spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties index ecb73d3..de6bfd5 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties @@ -16,8 +16,11 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, console +rootLogger.level = info +rootLogger.appenderRef.stdout.ref = console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c [%t] %x - %m%n \ No newline at end of file +appender.console.type = Console +appender.console.name = console +appender.console.target = SYSTEM_ERR +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p [%t] %c{1}: %m%n%ex diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala index f4ff49f..6628e9a 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala @@ -17,7 +17,6 @@ package org.apache.doris.spark.write -import org.apache.commons.lang3.StringUtils import org.apache.doris.spark.client.write.{CopyIntoProcessor, DorisCommitter, DorisWriter, StreamLoadProcessor} import org.apache.doris.spark.config.{DorisConfig, DorisOptions} import org.apache.doris.spark.util.Retry @@ -60,7 +59,7 @@ class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, if (txnId.isDefined) { committedMessages += txnId.get } else { - throw new Exception("Failed to commit batch") + log.warn("No txn {} to commit batch", txnId) } } DorisWriterCommitMessage(partitionId, taskId, epochId, committedMessages.toArray) @@ -106,7 +105,7 @@ class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId: Int, recordBuffer.clear() } writer.resetBatchCount() - LockSupport.parkNanos(batchIntervalMs.toLong) + LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs.toLong).toNanos) } writer.load(record) } { diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala index cf2914f..e5fddaf 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala @@ -53,9 +53,7 @@ class DorisWrite(config: DorisConfig, schema: StructType) extends BatchWrite wit // for batch write override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = { LOG.info("writerCommitMessages size: " + writerCommitMessages.length) - writerCommitMessages.foreach(x => println(x)) if (writerCommitMessages.exists(_ != null) && writerCommitMessages.nonEmpty) { - writerCommitMessages.foreach(x => println(x)) writerCommitMessages.filter(_ != null) .foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org