This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f07e8f  [improve] add doris sink itcase (#297)
5f07e8f is described below

commit 5f07e8fca0598e9fc38720c2da4d0816440c0e3c
Author: wudi <676366...@qq.com>
AuthorDate: Mon Mar 31 09:57:26 2025 +0800

    [improve] add doris sink itcase (#297)
---
 .github/workflows/run-e2ecase.yml                  |  52 +++++
 .../client/write/AbstractStreamLoadProcessor.java  |  40 ++--
 .../spark/client/write/StreamLoadProcessor.java    |  10 +-
 .../apache/doris/spark/config/DorisOptions.java    |   2 +-
 .../org/apache/doris/spark/util/EscapeHandler.java |  41 ++++
 .../org/apache/doris/spark/util/HttpUtils.scala    |   9 +-
 .../apache/doris/spark/util/RowConvertors.scala    |   3 +-
 .../spark/container/AbstractContainerTestBase.java |  63 ++++++
 .../spark/container/instance/ContainerService.java |   2 +
 .../spark/container/instance/DorisContainer.java   |   5 +
 .../container/instance/DorisCustomerContainer.java |   5 +
 .../doris/spark/sql/Doris2DorisE2ECase.scala       | 110 ++++++++++
 .../doris/spark/sql/DorisCatalogITCase.scala       | 137 +++++++++++++
 .../apache/doris/spark/sql/DorisReaderITCase.scala |   5 +-
 .../spark/sql/DorisWriterFailoverITCase.scala      | 220 ++++++++++++++++++++
 .../apache/doris/spark/sql/DorisWriterITCase.scala | 222 +++++++++++++++++++--
 .../resources/container/ddl/write_all_type.sql     |  31 +++
 .../{log4j.properties => log4j2-test.properties}   |  11 +-
 .../apache/doris/spark/write/DorisDataWriter.scala |   5 +-
 .../org/apache/doris/spark/write/DorisWrite.scala  |   2 -
 20 files changed, 918 insertions(+), 57 deletions(-)

diff --git a/.github/workflows/run-e2ecase.yml 
b/.github/workflows/run-e2ecase.yml
new file mode 100644
index 0000000..2f2949b
--- /dev/null
+++ b/.github/workflows/run-e2ecase.yml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run E2ECases
+on:
+  pull_request:
+  push:
+
+jobs:
+  build-extension:
+    name: "Run E2ECases"
+    runs-on: ubuntu-latest
+    defaults:
+      run:
+        shell: bash
+    steps:
+    - name: Checkout
+      uses: actions/checkout@master
+
+    - name: Setup java
+      uses: actions/setup-java@v2
+      with:
+        distribution: adopt
+        java-version: '8'
+ 
+    - name: Run E2ECases for spark 2
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
+    - name: Run E2ECases for spark 3.1
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
+    - name: Run E2ECases for spark 3.3
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+    
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 2a10ffa..37d3a48 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.spark.client.write;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
 import org.apache.doris.spark.client.DorisBackendHttpClient;
 import org.apache.doris.spark.client.DorisFrontendClient;
 import org.apache.doris.spark.client.entity.Backend;
@@ -27,8 +25,12 @@ import org.apache.doris.spark.config.DorisConfig;
 import org.apache.doris.spark.config.DorisOptions;
 import org.apache.doris.spark.exception.OptionRequiredException;
 import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.util.EscapeHandler;
 import org.apache.doris.spark.util.HttpUtils;
 import org.apache.doris.spark.util.URLs;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
@@ -110,6 +112,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
     private transient ExecutorService executor;
 
     private Future<CloseableHttpResponse> requestFuture = null;
+    private volatile String currentLabel;
 
     public AbstractStreamLoadProcessor(DorisConfig config) throws Exception {
         super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
@@ -129,7 +132,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         this.isGzipCompressionEnabled = 
properties.containsKey("compress_type") && 
"gzip".equals(properties.get("compress_type"));
         if (properties.containsKey(GROUP_COMMIT)) {
             String message = "";
-            if (!isTwoPhaseCommitEnabled) message = "group commit does not 
support two-phase commit";
+            if (isTwoPhaseCommitEnabled) message = "group commit does not 
support two-phase commit";
             if (properties.containsKey(PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS)))
                 message = "group commit does not support partial column 
updates";
             if 
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase()))
@@ -166,6 +169,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 output.write(toArrowFormat(rs));
             }
             output.close();
+            logger.info("stream load stopped with {}", currentLabel != null ? 
currentLabel : "group commit");
             CloseableHttpResponse res = requestFuture.get();
             if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                 throw new StreamLoadException("stream load execute failed, 
status: " + res.getStatusLine().getStatusCode()
@@ -194,13 +198,13 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 } catch (OptionRequiredException e) {
                     throw new RuntimeException("stream load handle commit 
props failed", e);
                 }
-                try {
-                    CloseableHttpResponse response = 
httpClient.execute(httpPut);
+                try(CloseableHttpResponse response = 
httpClient.execute(httpPut)){
                     if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
                         throw new RuntimeException("commit transaction failed, 
transaction: " + msg
                                 + ", status: " + 
response.getStatusLine().getStatusCode()
                                 + ", reason: " + 
response.getStatusLine().getReasonPhrase());
                     }
+                    logger.info("commit response: {}", 
EntityUtils.toString(response.getEntity()));
                 } catch (IOException e) {
                     throw new RuntimeException("commit transaction failed, 
transaction: " + msg, e);
                 }
@@ -221,13 +225,13 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 } catch (OptionRequiredException e) {
                     throw new RuntimeException("stream load handle abort props 
failed", e);
                 }
-                try {
-                    CloseableHttpResponse response = 
httpClient.execute(httpPut);
+                try(CloseableHttpResponse response = 
httpClient.execute(httpPut)){
                     if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
                         throw new RuntimeException("abort transaction failed, 
transaction: " + msg
                                 + ", status: " + 
response.getStatusLine().getStatusCode()
                                 + ", reason: " + 
response.getStatusLine().getReasonPhrase());
                     }
+                    logger.info("abort response: {}", 
EntityUtils.toString(response.getEntity()));
                 } catch (IOException e) {
                     throw new RuntimeException("abort transaction failed, 
transaction: " + msg, e);
                 }
@@ -274,8 +278,8 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
     private void handleStreamLoadProperties(HttpPut httpPut) throws 
OptionRequiredException {
         addCommonHeaders(httpPut);
         if (groupCommit == null || groupCommit.equals("off_mode")) {
-            String label = generateStreamLoadLabel();
-            httpPut.setHeader("label", label);
+            currentLabel = generateStreamLoadLabel();
+            httpPut.setHeader("label", currentLabel);
         }
         String writeFields = getWriteFields();
         httpPut.setHeader("columns", writeFields);
@@ -286,20 +290,12 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
 
         switch (format.toLowerCase()) {
             case "csv":
-                if (!properties.containsKey("column_separator")) {
-                    properties.put("column_separator", "\t");
-                }
-                columnSeparator = properties.get("column_separator");
-                if (!properties.containsKey("line_delimiter")) {
-                    properties.put("line_delimiter", "\n");
-                }
-                lineDelimiter = properties.get("line_delimiter");
+                // Handling hidden delimiters
+                columnSeparator = 
EscapeHandler.escapeString(properties.getOrDefault("column_separator", "\t"));
+                lineDelimiter = 
EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n"));
                 break;
             case "json":
-                if (!properties.containsKey("line_delimiter")) {
-                    properties.put("line_delimiter", "\n");
-                }
-                lineDelimiter = properties.get("line_delimiter");
+                lineDelimiter = properties.getOrDefault("line_delimiter", 
"\n");
                 properties.put("read_json_by_line", "true");
                 break;
         }
@@ -346,6 +342,8 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
             entity = new GzipCompressingEntity(entity);
         }
         httpPut.setEntity(entity);
+
+        logger.info("table {}.{} stream load started for {} on host {}:{}", 
database, table, currentLabel != null ? currentLabel : "group commit", host, 
port);
         return getExecutors().submit(() -> client.execute(httpPut));
     }
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
index 2f787a5..97ef1c0 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
@@ -16,15 +16,17 @@
 // under the License.
 package org.apache.doris.spark.client.write;
 
+import org.apache.doris.spark.config.DorisConfig;
+import org.apache.doris.spark.config.DorisOptions;
+import org.apache.doris.spark.exception.OptionRequiredException;
+import org.apache.doris.spark.util.RowConvertors;
+
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.doris.spark.config.DorisConfig;
-import org.apache.doris.spark.config.DorisOptions;
-import org.apache.doris.spark.exception.OptionRequiredException;
-import org.apache.doris.spark.util.RowConvertors;
+import org.apache.spark.SparkContext;
 import org.apache.spark.TaskContext;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.arrow.ArrowWriter;
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 5b666f4..4319688 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -81,7 +81,7 @@ public class DorisOptions {
      */
     public static final ConfigOption<Boolean> DORIS_SINK_TASK_USE_REPARTITION 
= 
ConfigOptions.name("doris.sink.task.use.repartition").booleanType().defaultValue(false).withDescription("");
 
-    public static final ConfigOption<Integer> DORIS_SINK_BATCH_INTERVAL_MS = 
ConfigOptions.name("doris.sink.batch.interval.ms").intType().defaultValue(50).withDescription("");
+    public static final ConfigOption<Integer> DORIS_SINK_BATCH_INTERVAL_MS = 
ConfigOptions.name("doris.sink.batch.interval.ms").intType().defaultValue(0).withDescription("");
 
     public static final ConfigOption<Boolean> DORIS_SINK_ENABLE_2PC = 
ConfigOptions.name("doris.sink.enable-2pc").booleanType().defaultValue(false).withDescription("");
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
new file mode 100644
index 0000000..436658d
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Handler for escape in properties. */
+public class EscapeHandler {
+    public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
+    public static final Pattern ESCAPE_PATTERN = 
Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
+
+    public static String escapeString(String source) {
+        if (source.contains(ESCAPE_DELIMITERS_FLAGS)) {
+            Matcher m = ESCAPE_PATTERN.matcher(source);
+            StringBuffer buf = new StringBuffer();
+            while (m.find()) {
+                m.appendReplacement(
+                        buf, String.format("%s", (char) 
Integer.parseInt(m.group(1), 16)));
+            }
+            m.appendTail(buf);
+            return buf.toString();
+        }
+        return source;
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
index 0a11c2e..90031f2 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
@@ -22,6 +22,7 @@ import org.apache.http.HttpHeaders
 import org.apache.http.client.methods.HttpRequestBase
 import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
 import org.apache.http.impl.client.{CloseableHttpClient, 
DefaultRedirectStrategy, HttpClients}
+import org.apache.http.protocol.HttpRequestExecutor
 import org.apache.http.ssl.SSLContexts
 
 import java.io.{File, FileInputStream}
@@ -33,9 +34,11 @@ import scala.util.{Failure, Success, Try}
 object HttpUtils {
 
   def getHttpClient(config: DorisConfig): CloseableHttpClient = {
-    val builder = HttpClients.custom().setRedirectStrategy(new 
DefaultRedirectStrategy {
-      override def isRedirectable(method: String): Boolean = true
-    })
+    val builder = HttpClients.custom()
+      .setRequestExecutor(new HttpRequestExecutor(60000))
+      .setRedirectStrategy(new DefaultRedirectStrategy {
+        override def isRedirectable(method: String): Boolean = true
+      })
     val enableHttps = config.getValue(DorisOptions.DORIS_ENABLE_HTTPS)
     if (enableHttps) {
       require(config.contains(DorisOptions.DORIS_HTTPS_KEY_STORE_PATH))
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index b75d1ce..1ae0996 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -40,7 +40,8 @@ object RowConvertors {
 
   def convertToCsv(row: InternalRow, schema: StructType, sep: String): String 
= {
     (0 until schema.length).map(i => {
-      asScalaValue(row, schema.fields(i).dataType, i)
+      val value = asScalaValue(row, schema.fields(i).dataType, i)
+      if (value == null) NULL_VALUE else value
     }).mkString(sep)
   }
 
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
index 97e7e26..c9b9768 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
@@ -21,6 +21,13 @@ import 
org.apache.doris.spark.container.instance.ContainerService;
 import org.apache.doris.spark.container.instance.DorisContainer;
 import org.apache.doris.spark.container.instance.DorisCustomerContainer;
 
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -28,6 +35,8 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.util.List;
 import java.util.Objects;
@@ -79,6 +88,10 @@ public abstract class AbstractContainerTestBase {
         return dorisContainerService.getPassword();
     }
 
+    protected int getQueryPort() {
+        return dorisContainerService.getQueryPort();
+    }
+
     protected String getDorisQueryUrl() {
         return dorisContainerService.getJdbcUrl();
     }
@@ -115,4 +128,54 @@ public abstract class AbstractContainerTestBase {
         assertEquals(expected.size(), actual.size());
         assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new 
Object[0]));
     }
+
+    protected void faultInjectionOpen() throws IOException {
+        String pointName = "FlushToken.submit_flush_error";
+        String apiUrl =
+                String.format(
+                        "http://%s/api/debug_point/add/%s";,
+                        dorisContainerService.getBenodes(), pointName);
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(
+                HttpHeaders.AUTHORIZATION,
+                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    protected void faultInjectionClear() throws IOException {
+        String apiUrl =
+                String.format(
+                        "http://%s/api/debug_point/clear";, 
dorisContainerService.getBenodes());
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(
+                HttpHeaders.AUTHORIZATION,
+                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    protected String auth(String user, String password) {
+        final String authInfo = user + ":" + password;
+        byte[] encoded = 
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded);
+    }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
index 3ec7ee5..f8ec293 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
@@ -49,4 +49,6 @@ public interface ContainerService {
     String getBenodes();
 
     void close();
+
+    int getQueryPort();
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
index 7c9297e..5220876 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
@@ -193,6 +193,11 @@ public class DorisContainer implements ContainerService {
         LOG.info("Doris container closed successfully.");
     }
 
+    @Override
+    public int getQueryPort() {
+        return 9030;
+    }
+
     private void initializeJDBCDriver() throws MalformedURLException {
         URLClassLoader urlClassLoader =
                 new URLClassLoader(
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
index 4ba4e74..4f64754 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
@@ -135,4 +135,9 @@ public class DorisCustomerContainer implements 
ContainerService {
 
     @Override
     public void close() {}
+
+    @Override
+    public int getQueryPort() {
+        return Integer.valueOf(System.getProperty("doris_query_port"));
+    }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala
new file mode 100644
index 0000000..032195d
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import 
org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import org.apache.spark.sql.SparkSession
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.slf4j.LoggerFactory
+
+import java.util
+
+object Doris2DorisE2ECase {
+  @Parameterized.Parameters(name = "readMode: {0}, flightSqlPort: {1}")
+  def parameters(): java.util.Collection[Array[AnyRef]] = {
+    import java.util.Arrays
+    Arrays.asList(
+      Array("thrift": java.lang.String, -1: java.lang.Integer),
+      Array("arrow": java.lang.String, 9611: java.lang.Integer)
+    )
+  }
+}
+
+/**
+ * Read Doris to Write Doris.
+ */
+@RunWith(classOf[Parameterized])
+class Doris2DorisE2ECase(readMode: String, flightSqlPort: Int) extends 
AbstractContainerTestBase{
+
+  private val LOG = LoggerFactory.getLogger(classOf[Doris2DorisE2ECase])
+  val DATABASE = "test_doris_e2e"
+  val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types"
+  val TABLE_WRITE_TBL_ALL_TYPES = "tbl_write_tbl_all_types"
+
+  @Before
+  def setUp(): Unit = {
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+  }
+
+  @Test
+  def testAllTypeE2ESQL(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val targetInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/write_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
targetInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+         |)
+         |""".stripMargin)
+
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_sink
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_ALL_TYPES}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}"
+         |)
+         |""".stripMargin)
+
+    session.sql(
+      """
+        |insert into test_sink select * from test_source
+        |""".stripMargin)
+    session.stop()
+
+    val excepted =
+      util.Arrays.asList(
+        
"1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello,
 Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", 
\"key2\":\"value2\"},{\"name\": \"Tom\", \"age\": 
30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}",
+        
"2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris
 Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", 
\"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\": 
25},{\"status\":\"ok\"},{\"data\":[1,2,3]}",
+        "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test 
Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\": 
\"Alice\", \"age\": 
40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}",
+        
"4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null");
+
+    val query = String.format("select * from %s order by id", 
TABLE_WRITE_TBL_ALL_TYPES)
+    ContainerUtils.checkResult(getDorisQueryConnection(DATABASE), LOG, 
excepted, query, 20, false)
+  }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala
new file mode 100644
index 0000000..39df05a
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import 
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
 getDorisQueryConnection}
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+import org.slf4j.LoggerFactory
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * it case for doris catalog.
+ */
+class DorisCatalogITCase extends AbstractContainerTestBase {
+
+  private val LOG = LoggerFactory.getLogger(classOf[DorisCatalogITCase])
+  private val DATABASE = "test_catalog"
+  private val TBL_CATALOG = "tbl_catalog"
+
+  @Test
+  @throws[Exception]
+  def testSparkCatalog(): Unit = {
+
+    val conf = new SparkConf()
+    conf.set("spark.sql.catalog.doris_catalog", 
"org.apache.doris.spark.catalog.DorisTableCatalog")
+    conf.set("spark.sql.catalog.doris_catalog.doris.fenodes", getFenodes)
+    conf.set("spark.sql.catalog.doris_catalog.doris.query.port", 
getQueryPort.toString)
+    conf.set("spark.sql.catalog.doris_catalog.doris.user", getDorisUsername)
+    conf.set("spark.sql.catalog.doris_catalog.doris.password", 
getDorisPassword)
+    val session = 
SparkSession.builder().config(conf).master("local[*]").getOrCreate()
+
+    // session.sessionState.catalogManager.setCurrentCatalog("doris_catalog")
+    // spark 2 no catalogManager property, used reflect
+    try {
+      val stateObj = session.sessionState
+      val catalogManagerObj = 
stateObj.getClass.getMethod("catalogManager").invoke(stateObj)
+      val setCurrentCatalogMethod = 
catalogManagerObj.getClass.getMethod("setCurrentCatalog", classOf[String])
+      setCurrentCatalogMethod.invoke(catalogManagerObj, "doris_catalog")
+    } catch {
+      case e: Exception =>
+        // if Spark 2,will throw NoSuchMethodException
+        println("Catalog API not available, skipping catalog operations")
+        e.printStackTrace()
+        return
+    }
+
+    // show databases
+    val showDatabaseActual = new util.ArrayList[String](session.sql("show 
databases").collect().map(_.getAs[String]("namespace")).toList.asJava)
+    showDatabaseActual.add("information_schema")
+    val showDatabaseExcept = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("show databases"),
+      1)
+    checkResultInAnyOrder("testSparkCatalog", showDatabaseExcept.toArray, 
showDatabaseActual.toArray)
+
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+
+    // mock data
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+      String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TBL_CATALOG),
+      String.format("CREATE TABLE %s.%s ( \n"
+        + "`name` varchar(256),\n"
+        + "`age` int\n"
+        + ") "
+        + " DUPLICATE KEY(`name`) "
+        + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+        + "PROPERTIES ("
+        + "\"replication_num\" = \"1\")", DATABASE, TBL_CATALOG),
+      String.format("insert into %s.%s  values ('doris',18)", DATABASE, 
TBL_CATALOG),
+      String.format("insert into %s.%s  values ('spark',10)", DATABASE, 
TBL_CATALOG)
+    )
+
+    // show tables
+    session.sql("USE " + DATABASE);
+    val showTablesActual = session.sql("show 
tables").collect().map(_.getAs[String]("tableName")).toList.asJava
+    val showTablesExcept = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection(DATABASE),
+      LOG,
+      String.format("show tables"),
+      1)
+    checkResultInAnyOrder("testSparkCatalog", showTablesExcept.toArray, 
showTablesActual.toArray)
+
+    val query = String.format("select * from %s.%s", DATABASE, TBL_CATALOG)
+    // select tables
+    val selectActual = session.sql(query).collect().map(i=> 
i.getAs[String]("name") + "," + i.getAs[Int]("age")).toList.asJava
+    val selectExcept = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection(DATABASE),
+      LOG,
+      query,
+      2)
+    checkResultInAnyOrder("testSparkCatalog", selectExcept.toArray, 
selectActual.toArray)
+
+    session.sql(String.format("desc %s",TBL_CATALOG)).show(true);
+    // insert tables
+    // todo: insert into values('') schema does not match
+    session.sql(String.format("insert overwrite %s.%s select 'insert-data' as 
name, 99 as age", DATABASE, TBL_CATALOG))
+    val selectNewExcept = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection(DATABASE),
+      LOG,
+      query,
+      2)
+    checkResultInAnyOrder("testSparkCatalog", selectNewExcept.toArray, 
util.Arrays.asList("insert-data,99").toArray)
+  }
+
+
+  private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], 
actual: Array[AnyRef]): Unit = {
+    LOG.info("Checking DorisCatalogITCase result. testName={}, actual={}, 
expected={}", testName, actual, expected)
+    assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
+  }
+
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 67a0688..07c998b 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -42,6 +42,9 @@ object DorisReaderITCase {
   }
 }
 
+/**
+ * it case for doris reader.
+ */
 @RunWith(classOf[Parameterized])
 class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends 
AbstractContainerTestBase {
 
@@ -121,7 +124,7 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
          | "user"="${getDorisUsername}",
          | "password"="${getDorisPassword}",
          | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+         | "doris.fe.auto.fetch"="true"
          |)
          |""".stripMargin)
 
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
new file mode 100644
index 0000000..bbaf7bd
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import 
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
 getDorisQueryConnection}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.sql.SparkSession
+import org.junit.{Before, Test}
+import org.slf4j.LoggerFactory
+
+import java.util
+import java.util.UUID
+import java.util.concurrent.{Executors, TimeUnit}
+import scala.util.control.Breaks._
+import scala.collection.JavaConverters._
+
+/**
+ * Test DorisWriter failover.
+ */
+class DorisWriterFailoverITCase extends AbstractContainerTestBase {
+
+  private val LOG = LoggerFactory.getLogger(classOf[DorisWriterFailoverITCase])
+  val DATABASE = "test_doris_failover"
+  val TABLE_WRITE_TBL_RETRY = "tbl_write_tbl_retry"
+  val TABLE_WRITE_TBL_TASK_RETRY = "tbl_write_tbl_task_retry"
+  val TABLE_WRITE_TBL_PRECOMMIT_FAIL = "tbl_write_tbl_precommit_fail"
+  val TABLE_WRITE_TBL_COMMIT_FAIL = "tbl_write_tbl_commit_fail"
+
+  @Before
+  def setUp(): Unit = {
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+  }
+
+  @Test
+  def testFailoverForRetry(): Unit = {
+    initializeTable(TABLE_WRITE_TBL_RETRY, DataModel.DUPLICATE)
+    val session = SparkSession.builder().master("local[1]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris", "1234"),
+      ("spark", "123456"),
+      ("catalog", "12345678")
+    )).toDF("name", "address")
+    df.createTempView("mock_source")
+
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_sink
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_RETRY}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.sink.batch.interval.ms"="1000",
+         | "doris.sink.batch.size"="1",
+         | "doris.sink.max-retries"="100",
+         | "doris.sink.enable-2pc"="false"
+         |)
+         |""".stripMargin)
+
+    val service = Executors.newSingleThreadExecutor()
+    val future = service.submit(new Runnable {
+      override def run(): Unit = {
+        session.sql("INSERT INTO test_sink SELECT * FROM mock_source")
+      }
+    })
+
+    val query = String.format("SELECT * FROM %s.%s", DATABASE, 
TABLE_WRITE_TBL_RETRY)
+    var result: util.List[String] = null
+    val connection = getDorisQueryConnection(DATABASE)
+    breakable {
+      while (true) {
+        try {
+          // query may be failed
+          result = ContainerUtils.executeSQLStatement(connection, LOG, query, 
2)
+        } catch {
+          case ex: Exception =>
+            LOG.error("Failed to query result, cause " + ex.getMessage)
+        }
+
+        // until insert 1 rows
+        if (result.size >= 1){
+          Thread.sleep(5000)
+          ContainerUtils.executeSQLStatement(
+            connection,
+            LOG,
+            String.format("ALTER TABLE %s.%s MODIFY COLUMN address 
varchar(256)", DATABASE, TABLE_WRITE_TBL_RETRY))
+          break
+        }
+      }
+    }
+
+    future.get(60, TimeUnit.SECONDS)
+    session.stop()
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_WRITE_TBL_RETRY),
+      2)
+    val expected = util.Arrays.asList("doris,1234", "spark,123456", 
"catalog,12345678");
+    checkResultInAnyOrder("testFailoverForRetry", expected.toArray, 
actual.toArray)
+  }
+
+
+  /**
+   * Test failover for task retry and sink.max-retries=0
+   */
+  @Test
+  def testFailoverForTaskRetry(): Unit = {
+    initializeTable(TABLE_WRITE_TBL_TASK_RETRY, DataModel.DUPLICATE)
+    val session = SparkSession.builder().master("local[1,100]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris", "cn"),
+      ("spark", "us"),
+      ("catalog", "uk")
+    )).toDF("name", "address")
+    df.createTempView("mock_source")
+
+    var uuid = UUID.randomUUID().toString
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_sink
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_TASK_RETRY}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.sink.batch.size"="1",
+         | "doris.sink.batch.interval.ms"="1000",
+         | "doris.sink.max-retries"="0",
+         | "doris.sink.enable-2pc"="true",
+         | "doris.sink.label.prefix"='${uuid}'
+         |)
+         |""".stripMargin)
+
+    val service = Executors.newSingleThreadExecutor()
+    val future = service.submit(new Runnable {
+      override def run(): Unit = {
+        session.sql("INSERT INTO test_sink SELECT * FROM mock_source")
+      }
+    })
+
+    val query = "show transaction from " + DATABASE + " where label like '" + 
uuid + "%'"
+    var result: List[String] = null
+    val connection = getDorisQueryConnection(DATABASE)
+    breakable {
+      while (true) {
+        try {
+          // query may be failed
+          result = ContainerUtils.executeSQLStatement(connection, LOG, query, 
15).asScala.toList
+        } catch {
+          case ex: Exception =>
+            LOG.error("Failed to query result, cause " + ex.getMessage)
+        }
+
+        // until insert 1 rows
+        if (result.size >= 1 && result.forall(s => 
s.contains("PRECOMMITTED"))){
+          faultInjectionOpen()
+          Thread.sleep(3000)
+          faultInjectionClear()
+          break
+        }
+      }
+    }
+
+    future.get(60, TimeUnit.SECONDS)
+    session.stop()
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, 
TABLE_WRITE_TBL_TASK_RETRY),
+      2)
+    val expected = util.Arrays.asList("doris,cn", "spark,us", "catalog,uk");
+    checkResultInAnyOrder("testFailoverForTaskRetry", expected.toArray, 
actual.toArray)
+  }
+
+
+  private def initializeTable(table: String, dataModel: DataModel): Unit = {
+    val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+    val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
+    val model = if (dataModel == DataModel.UNIQUE_MOR) 
DataModel.UNIQUE.toString else dataModel.toString
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+      String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+      String.format("CREATE TABLE %s.%s ( \n"
+        + "`name` varchar(32),\n"
+        + "`address` varchar(4) %s\n"
+        + ") "
+        + " %s KEY(`name`) "
+        + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+        + "PROPERTIES ("
+        + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, 
max, model))
+  }
+
+  private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], 
actual: Array[AnyRef]): Unit = {
+    LOG.info("Checking DorisWriterFailoverITCase result. testName={}, 
actual={}, expected={}", testName, actual, expected)
+    assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
+  }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 7f1e393..51201e4 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -19,26 +19,36 @@ package org.apache.doris.spark.sql
 
 import 
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
 getDorisQueryConnection}
 import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import org.apache.doris.spark.rest.models.DataModel
 import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.junit.Test
 import org.slf4j.LoggerFactory
 
 import java.util
 import scala.collection.JavaConverters._
+
+/**
+ * it case for doris writer.
+ */
 class DorisWriterITCase extends AbstractContainerTestBase {
 
-  private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase])
+  private val LOG = LoggerFactory.getLogger(classOf[DorisWriterITCase])
 
-  val DATABASE: String = "test"
+  val DATABASE: String = "test_doris_write"
   val TABLE_CSV: String = "tbl_csv"
+  val TABLE_CSV_HIDE_SEP: String = "tbl_csv_hide_sep"
+  val TABLE_GROUP_COMMIT: String = "tbl_group_commit"
   val TABLE_JSON: String = "tbl_json"
+  val TABLE_JSON_EMPTY_PARTITION: String = "tbl_json_empty_partition"
   val TABLE_JSON_TBL: String = "tbl_json_tbl"
+  val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite"
+  val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow"
 
   @Test
   @throws[Exception]
   def testSinkCsvFormat(): Unit = {
-    initializeTable(TABLE_CSV)
-    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    initializeTable(TABLE_CSV, DataModel.DUPLICATE)
+    val session = SparkSession.builder().master("local[1]").getOrCreate()
     val df = session.createDataFrame(Seq(
       ("doris_csv", 1),
       ("spark_csv", 2)
@@ -46,17 +56,20 @@ class DorisWriterITCase extends AbstractContainerTestBase {
     df.write
       .format("doris")
       .option("doris.fenodes", getFenodes)
+      .option("doris.sink.auto-redirect", false)
       .option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
       .option("user", getDorisUsername)
       .option("password", getDorisPassword)
       .option("sink.properties.column_separator", ",")
       .option("sink.properties.line_delimiter", "\n")
       .option("sink.properties.format", "csv")
+      .option("doris.sink.batch.interval.ms", "5000")
+      .option("doris.sink.batch.size", "1")
       .mode(SaveMode.Append)
       .save()
     session.stop()
 
-    Thread.sleep(10000)
+    Thread.sleep(15000)
     val actual = ContainerUtils.executeSQLStatement(
       getDorisQueryConnection,
       LOG,
@@ -66,10 +79,136 @@ class DorisWriterITCase extends AbstractContainerTestBase {
     checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), 
actual.toArray)
   }
 
+  @Test
+  @throws[Exception]
+  def testSinkCsvFormatHideSep(): Unit = {
+    initializeTable(TABLE_CSV_HIDE_SEP, DataModel.AGGREGATE)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_csv", 1),
+      ("spark_csv", 2)
+    )).toDF("name", "age")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", getFenodes + "," + getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP)
+      .option("user", getDorisUsername)
+      .option("password", getDorisPassword)
+      .option("sink.properties.column_separator", "\\x01")
+      .option("sink.properties.line_delimiter", "\\x02")
+      .option("sink.properties.format", "csv")
+      .mode(SaveMode.Append)
+      .save()
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP),
+      2)
+    val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+    checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(), 
actual.toArray)
+  }
+
+  @Test
+  @throws[Exception]
+  def testSinkGroupCommit(): Unit = {
+    initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_csv", 1),
+      ("spark_csv", 2)
+    )).toDF("name", "age")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT)
+      .option("user", getDorisUsername)
+      .option("password", getDorisPassword)
+      .option("sink.properties.group_commit", "sync_mode")
+      .mode(SaveMode.Append)
+      .save()
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT),
+      2)
+    val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+    checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(), 
actual.toArray)
+  }
+
+  @Test
+  @throws[Exception]
+  def testSinkEmptyPartition(): Unit = {
+    initializeTable(TABLE_JSON_EMPTY_PARTITION, DataModel.AGGREGATE)
+    val session = SparkSession.builder().master("local[2]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_json", 1)
+    )).toDF("name", "age")
+    df.repartition(2).write
+      .format("doris")
+      .option("doris.fenodes", getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + 
TABLE_JSON_EMPTY_PARTITION)
+      .option("user", getDorisUsername)
+      .option("password", getDorisPassword)
+      .option("sink.properties.read_json_by_line", "true")
+      .option("sink.properties.format", "json")
+      .option("doris.sink.auto-redirect", "false")
+      .option("doris.sink.enable-2pc", "true")
+      .mode(SaveMode.Append)
+      .save()
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, 
TABLE_JSON_EMPTY_PARTITION),
+      2)
+    val expected = util.Arrays.asList("doris_json,1");
+    checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray, 
actual.toArray)
+  }
+
+  @Test
+  @throws[Exception]
+  def testSinkArrowFormat(): Unit = {
+    initializeTable(TABLE_JSON_TBL_ARROW, DataModel.DUPLICATE)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_json", 1),
+      ("spark_json", 2)
+    )).toDF("name", "age")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", getFenodes)
+      .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_TBL_ARROW)
+      .option("user", getDorisUsername)
+      .option("password", getDorisPassword)
+      .option("sink.properties.format", "arrow")
+      .option("doris.sink.batch.size", "1")
+      .option("doris.sink.enable-2pc", "true")
+      .mode(SaveMode.Append)
+      .save()
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW),
+      2)
+    val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
+    checkResultInAnyOrder("testSinkArrowFormat", expected.toArray, 
actual.toArray)
+  }
+
   @Test
   @throws[Exception]
   def testSinkJsonFormat(): Unit = {
-    initializeTable(TABLE_JSON)
+    initializeTable(TABLE_JSON, DataModel.UNIQUE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
     val df = session.createDataFrame(Seq(
       ("doris_json", 1),
@@ -101,7 +240,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
   @Test
   @throws[Exception]
   def testSQLSinkFormat(): Unit = {
-    initializeTable(TABLE_JSON_TBL)
+    initializeTable(TABLE_JSON_TBL, DataModel.UNIQUE_MOR)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
     val df = session.createDataFrame(Seq(
       ("doris_tbl", 1),
@@ -135,25 +274,74 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
     checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, 
actual.toArray)
   }
 
-
+  @Test
   @throws[Exception]
-  private def initializeTable(table: String): Unit = {
+  def testSQLSinkOverwrite(): Unit = {
+    initializeTable(TABLE_JSON_TBL_OVERWRITE, DataModel.DUPLICATE)
+    // init history data
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("insert into %s.%s  values ('history-doris',1118)", 
DATABASE, TABLE_JSON_TBL_OVERWRITE),
+      String.format("insert into %s.%s  values ('history-spark',1110)", 
DATABASE, TABLE_JSON_TBL_OVERWRITE))
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("doris_tbl", 1),
+      ("spark_tbl", 2)
+    )).toDF("name", "age")
+    df.createTempView("mock_source")
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_sink
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.query.port"="${getQueryPort}",
+         | "doris.sink.label.prefix"="doris-label-customer",
+         | "doris.sink.enable-2pc"="true"
+         |)
+         |""".stripMargin)
+    session.sql(
+      """
+        |insert overwrite table test_sink select  name,age from mock_source
+        |""".stripMargin)
+    session.stop()
+
+    Thread.sleep(10000)
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_OVERWRITE),
+      2)
+    val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
+    checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray, 
actual.toArray)
+  }
+
+  private def initializeTable(table: String, dataModel: DataModel): Unit = {
+    val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+    val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
+    val model = if (dataModel == DataModel.UNIQUE_MOR) 
DataModel.UNIQUE.toString else dataModel.toString
     ContainerUtils.executeSQLStatement(
       getDorisQueryConnection,
       LOG,
       String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
       String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
-      String.format(
-        "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" + 
") " +
-          "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
-          "PROPERTIES (\n" +
-          "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table)
-    )
+      String.format("CREATE TABLE %s.%s ( \n"
+        + "`name` varchar(256),\n"
+        + "`age` int %s\n"
+        + ") "
+        + " %s KEY(`name`) "
+        + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+        + "PROPERTIES ("
+        + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, 
max, model))
   }
 
   private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], 
actual: Array[AnyRef]): Unit = {
-    LOG.info("Checking DorisSourceITCase result. testName={}, actual={}, 
expected={}", testName, actual, expected)
+    LOG.info("Checking DorisWriterFailoverITCase result. testName={}, 
actual={}, expected={}", testName, actual, expected)
     assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
   }
-
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql
new file mode 100644
index 0000000..967bdf5
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql
@@ -0,0 +1,31 @@
+DROP TABLE IF EXISTS tbl_write_tbl_all_types;
+
+CREATE TABLE tbl_write_tbl_all_types (
+`id` int,
+`c1` boolean,
+`c2` tinyint,
+`c3` smallint,
+`c4` int,
+`c5` bigint,
+`c6` largeint,
+`c7` float,
+`c8` double,
+`c9` decimal(12,4),
+`c10` date,
+`c11` datetime,
+`c12` char(1),
+`c13` varchar(256),
+`c14` string,
+`c15` Array<String>,
+`c16` Map<String, String>,
+`c17` Struct<name: String, age: int>,
+`c18` JSON,
+`c19` VARIANT
+)
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 2
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties
similarity index 76%
rename from 
spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
rename to 
spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties
index ecb73d3..de6bfd5 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties
@@ -16,8 +16,11 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, console
+rootLogger.level = info
+rootLogger.appenderRef.stdout.ref = console
 
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c 
[%t] %x - %m%n
\ No newline at end of file
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p [%t] %c{1}: %m%n%ex
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index f4ff49f..6628e9a 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -17,7 +17,6 @@
 
 package org.apache.doris.spark.write
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.doris.spark.client.write.{CopyIntoProcessor, DorisCommitter, 
DorisWriter, StreamLoadProcessor}
 import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
 import org.apache.doris.spark.util.Retry
@@ -60,7 +59,7 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
       if (txnId.isDefined) {
         committedMessages += txnId.get
       } else {
-        throw new Exception("Failed to commit batch")
+        log.warn("No txn {} to commit batch", txnId)
       }
     }
     DorisWriterCommitMessage(partitionId, taskId, epochId, 
committedMessages.toArray)
@@ -106,7 +105,7 @@ class DorisDataWriter(config: DorisConfig, schema: 
StructType, partitionId: Int,
           recordBuffer.clear()
         }
         writer.resetBatchCount()
-        LockSupport.parkNanos(batchIntervalMs.toLong)
+        
LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs.toLong).toNanos)
       }
       writer.load(record)
     } {
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
index cf2914f..e5fddaf 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
@@ -53,9 +53,7 @@ class DorisWrite(config: DorisConfig, schema: StructType) 
extends BatchWrite wit
   // for batch write
   override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = 
{
     LOG.info("writerCommitMessages size: " + writerCommitMessages.length)
-    writerCommitMessages.foreach(x => println(x))
     if (writerCommitMessages.exists(_ != null) && 
writerCommitMessages.nonEmpty) {
-      writerCommitMessages.foreach(x => println(x))
       writerCommitMessages.filter(_ != null)
         
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort))
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to