This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 82925e08 [Improve](case) add itcase for dorissource , dorissink and 
lookup join (#569)
82925e08 is described below

commit 82925e08aa2525af9c66f36e47b7ebbce881384c
Author: wudi <676366...@qq.com>
AuthorDate: Wed Mar 12 15:57:26 2025 +0800

    [Improve](case) add itcase for dorissource , dorissink and lookup join 
(#569)
    
    * add source case
---
 flink-doris-connector/pom.xml                      |  14 +-
 .../doris/flink/catalog/doris/DataModel.java       |   1 +
 .../apache/doris/flink/cfg/DorisReadOptions.java   |   8 +
 .../java/org/apache/doris/flink/lookup/Worker.java |   2 +-
 .../apache/doris/flink/serialization/RowBatch.java |   1 +
 .../flink/container/AbstractContainerTestBase.java |   2 +-
 .../flink/container/AbstractITCaseService.java     |  14 ++
 .../doris/flink/container/ContainerUtils.java      |  51 +++-
 .../flink/container/e2e/Doris2DorisE2ECase.java    |  69 +++---
 .../flink/container/instance/ContainerService.java |   4 +
 .../flink/container/instance/DorisContainer.java   |  34 ++-
 .../doris/flink/lookup/DorisLookupTableITCase.java | 234 +++++++++++++++++-
 .../doris/flink/sink/DorisSinkFailoverITCase.java  | 265 +++++++++++++++++++++
 .../apache/doris/flink/sink/DorisSinkITCase.java   | 161 ++++++++++---
 .../doris/flink/source/DorisSourceITCase.java      | 189 +++++++++++----
 .../org/apache/doris/flink/utils/MockSource.java   |  12 +-
 .../doris2doris/test_doris2doris_sink_test_tbl.sql |  10 +-
 .../test_doris2doris_source_test_tbl.sql           |  73 ++----
 .../container/e2e/mysql2doris/testMySQL2Doris.txt  |   1 +
 .../src/test/resources/docker/doris/be.conf        |  99 ++++++++
 .../src/test/resources/docker/doris/fe.conf        |  74 ++++++
 21 files changed, 1130 insertions(+), 188 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index fde9eeb5..c584ebbf 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -572,13 +572,15 @@ under the License.
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.22.0</version>
+                <version>3.0.0-M5</version>
                 <configuration>
-                <!-- skip itcase e2ecase in mvn package
-                <includes>
-                      <include>**/*.java</include>
-                 </includes>
-                 -->
+                    <!-- Activate the use of TCP to transmit events to the 
plugin -->
+                    <forkNode 
implementation="org.apache.maven.plugin.surefire.extensions.SurefireForkNodeFactory"/>
+                    <!-- skip itcase e2ecase in mvn package
+                    <includes>
+                          <include>**/*.java</include>
+                     </includes>
+                     -->
                 </configuration>
             </plugin>
             <plugin>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
index a1085b53..f92935fa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
@@ -20,5 +20,6 @@ package org.apache.doris.flink.catalog.doris;
 public enum DataModel {
     DUPLICATE,
     UNIQUE,
+    UNIQUE_MOR,
     AGGREGATE
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 1889ace5..7e1beea2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.flink.cfg;
 
+import org.apache.flink.util.Preconditions;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -408,6 +410,12 @@ public class DorisReadOptions implements Serializable {
          * @return a DorisReadOptions with the settings made for this builder.
          */
         public DorisReadOptions build() {
+
+            if (useFlightSql) {
+                Preconditions.checkNotNull(
+                        flightSqlPort, "flight sql port must be set when use 
flight sql to read.");
+            }
+
             return new DorisReadOptions(
                     readFields,
                     filterQuery,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
index 8eef01cf..b744f454 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -107,6 +107,7 @@ public class Worker implements Runnable {
                 recordList.size(),
                 deduplicateList.size());
         StringBuilder sb = new StringBuilder();
+        sb.append("/* ApplicationName=Flink Lookup Query */ ");
         boolean first;
         for (int i = 0; i < deduplicateList.size(); i++) {
             if (i > 0) {
@@ -169,7 +170,6 @@ public class Worker implements Runnable {
 
     private void appendSelect(StringBuilder sb, LookupSchema schema) {
         String[] selectFields = schema.getSelectFields();
-        sb.append("/* ApplicationName=Flink Lookup Query */ ");
         sb.append(" select ");
         for (int i = 0; i < selectFields.length; i++) {
             if (i > 0) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index dee9c1fc..ddaf6600 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -462,6 +462,7 @@ public class RowBatch {
             case "VARCHAR":
             case "STRING":
             case "JSONB":
+            case "JSON":
             case "VARIANT":
                 if (!minorType.equals(Types.MinorType.VARCHAR)) {
                     return false;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 5c7c151e..5433bbc2 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertTrue;
 
 public abstract class AbstractContainerTestBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractContainerTestBase.class);
-    private static ContainerService dorisContainerService;
+    protected static ContainerService dorisContainerService;
     public static final int DEFAULT_PARALLELISM = 2;
 
     @BeforeClass
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
index 6628933c..e6b8e163 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
 public abstract class AbstractITCaseService extends AbstractContainerTestBase {
@@ -131,4 +132,17 @@ public abstract class AbstractITCaseService extends 
AbstractContainerTestBase {
         } catch (InterruptedException ignored) {
         }
     }
+
+    protected JobStatus getFlinkJobStatus(JobClient jobClient) {
+        JobStatus jobStatus;
+        try {
+            jobStatus = jobClient.getJobStatus().get();
+        } catch (IllegalStateException e) {
+            LOG.info("Failed to get state, cause " + e.getMessage());
+            jobStatus = JobStatus.FINISHED;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        return jobStatus;
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
index f87e498c..6a0ee947 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
 public class ContainerUtils {
@@ -54,6 +55,30 @@ public class ContainerUtils {
         }
     }
 
+    public static List<String> executeSQLStatement(
+            Connection connection, Logger logger, String sql, int columnSize) {
+        List<String> result = new ArrayList<>();
+        if (Objects.isNull(sql)) {
+            return result;
+        }
+        try (Statement statement = connection.createStatement()) {
+            logger.info("start to execute sql={}", sql);
+            ResultSet resultSet = statement.executeQuery(sql);
+
+            while (resultSet.next()) {
+                StringJoiner sb = new StringJoiner(",");
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = resultSet.getObject(i);
+                    sb.add(String.valueOf(value));
+                }
+                result.add(sb.toString());
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new DorisRuntimeException(e);
+        }
+    }
+
     public static String loadFileContent(String resourcePath) {
         try (InputStream stream =
                 
ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
@@ -114,6 +139,23 @@ public class ContainerUtils {
             String query,
             int columnSize,
             boolean ordered) {
+        List<String> actual = getResult(connection, logger, expected, query, 
columnSize);
+        if (ordered) {
+            Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+        } else {
+            Assert.assertEquals(expected.size(), actual.size());
+            Assert.assertArrayEquals(
+                    expected.stream().sorted().toArray(Object[]::new),
+                    actual.stream().sorted().toArray(Object[]::new));
+        }
+    }
+
+    public static List<String> getResult(
+            Connection connection,
+            Logger logger,
+            List<String> expected,
+            String query,
+            int columnSize) {
         List<String> actual = new ArrayList<>();
         try (Statement statement = connection.createStatement()) {
             ResultSet sinkResultSet = statement.executeQuery(query);
@@ -141,13 +183,6 @@ public class ContainerUtils {
                 "checking test result. expected={}, actual={}",
                 String.join(",", expected),
                 String.join(",", actual));
-        if (ordered) {
-            Assert.assertArrayEquals(expected.toArray(), actual.toArray());
-        } else {
-            Assert.assertEquals(expected.size(), actual.size());
-            Assert.assertArrayEquals(
-                    expected.stream().sorted().toArray(Object[]::new),
-                    actual.stream().sorted().toArray(Object[]::new));
-        }
+        return actual;
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
index ce25c677..1b56be22 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -19,29 +19,39 @@ package org.apache.doris.flink.container.e2e;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 
 import org.apache.doris.flink.container.AbstractContainerTestBase;
 import org.apache.doris.flink.container.ContainerUtils;
 import org.apache.doris.flink.table.DorisConfigOptions;
-import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
+@RunWith(Parameterized.class)
 public class Doris2DorisE2ECase extends AbstractContainerTestBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(Doris2DorisE2ECase.class);
     private static final String DATABASE_SOURCE = "test_doris2doris_source";
     private static final String DATABASE_SINK = "test_doris2doris_sink";
     private static final String TABLE = "test_tbl";
 
+    private final boolean useFlightRead;
+
+    public Doris2DorisE2ECase(boolean useFlightRead) {
+        this.useFlightRead = useFlightRead;
+    }
+
+    @Parameterized.Parameters(name = "useFlightRead: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
     @Test
     public void testDoris2Doris() throws Exception {
         LOG.info("Start executing the test case of doris to doris.");
@@ -68,10 +78,12 @@ public class Doris2DorisE2ECase extends 
AbstractContainerTestBase {
                                 + "c11  TIMESTAMP, \n"
                                 + "c12  char(1), \n"
                                 + "c13  varchar(256), \n"
-                                + "c14  Array<String>, \n"
-                                + "c15  Map<String, String>, \n"
-                                + "c16  ROW<name String, age int>, \n"
-                                + "c17  STRING \n"
+                                + "c14  STRING, \n"
+                                + "c15  Array<String>, \n"
+                                + "c16  Map<String, String>, \n"
+                                + "c17  ROW<name String, age int>, \n"
+                                + "c18  STRING, \n"
+                                + "c19  STRING\n"
                                 + ") WITH ("
                                 + " 'connector' = '"
                                 + DorisConfigOptions.IDENTIFIER
@@ -82,12 +94,15 @@ public class Doris2DorisE2ECase extends 
AbstractContainerTestBase {
                                 + UUID.randomUUID()
                                 + "',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',\n"
+                                + " 'source.flight-sql-port' = '9611'"
                                 + ")",
                         getFenodes(),
                         DATABASE_SOURCE + "." + TABLE,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightRead);
         tEnv.executeSql(sourceDDL);
 
         String sinkDDL =
@@ -107,10 +122,12 @@ public class Doris2DorisE2ECase extends 
AbstractContainerTestBase {
                                 + "c11  TIMESTAMP, \n"
                                 + "c12  char(1), \n"
                                 + "c13  varchar(256), \n"
-                                + "c14  Array<String>, \n"
-                                + "c15  Map<String, String>, \n"
-                                + "c16  ROW<name String, age int>, \n"
-                                + "c17  STRING \n"
+                                + "c14  STRING, \n"
+                                + "c15  Array<String>, \n"
+                                + "c16  Map<String, String>, \n"
+                                + "c17  ROW<name String, age int>, \n"
+                                + "c18  STRING, \n"
+                                + "c19  STRING\n"
                                 + ") WITH ("
                                 + " 'connector' = '"
                                 + DorisConfigOptions.IDENTIFIER
@@ -131,21 +148,15 @@ public class Doris2DorisE2ECase extends 
AbstractContainerTestBase {
 
         tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM 
doris_source").await();
 
-        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
-        List<Object> actual = new ArrayList<>();
-        try (CloseableIterator<Row> iterator = tableResult.collect()) {
-            while (iterator.hasNext()) {
-                actual.add(iterator.next().toString());
-            }
-        }
-        LOG.info("The actual data in the doris sink table is, actual={}", 
actual);
+        List<String> excepted =
+                Arrays.asList(
+                        
"1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello,
 Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\", 
\"key2\":\"value2\"},{\"name\": \"Tom\", \"age\": 
30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}",
+                        
"2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris
 Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\", 
\"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\": 
25},{\"status\":\"ok\"},{\"data\":[1,2,3]}",
+                        
"3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test 
Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\": 
\"Alice\", \"age\": 
40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}",
+                        
"4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null");
 
-        String[] expected =
-                new String[] {
-                    "+I[1, true, 127, 32767, 2147483647, 9223372036854775807, 
123456789012345678901234567890, 3.14, 2.7182818284, 12345.6789, 2023-05-22, 
2023-05-22T12:34:56, A, Example text, [item1, item2, item3], {key1=value1, 
key2=value2}, +I[John Doe, 30], {\"key\":\"value\"}]",
-                    "+I[2, false, -128, -32768, -2147483648, 
-9223372036854775808, -123456789012345678901234567890, -3.14, -2.7182818284, 
-12345.6789, 2024-01-01, 2024-01-01T00:00, B, Another example, [item4, item5, 
item6], {key3=value3, key4=value4}, +I[Jane Doe, 25], 
{\"another_key\":\"another_value\"}]"
-                };
-        Assert.assertArrayEquals(expected, actual.toArray(new String[0]));
+        String query = String.format("SELECT * FROM %s.%s", DATABASE_SINK, 
TABLE);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, excepted, 
query, 20, false);
     }
 
     private void initializeDorisTable() {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
index 684de5a0..2dcbe210 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
@@ -25,6 +25,10 @@ public interface ContainerService {
 
     void startContainer();
 
+    default void restartContainer() {
+        throw new DorisRuntimeException("Only doris docker container can 
implemented.");
+    }
+
     boolean isRunning();
 
     Connection getQueryConnection();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
index ef399d0d..1bd483c3 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
@@ -25,6 +25,7 @@ import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
 
 import java.io.BufferedReader;
 import java.io.InputStream;
@@ -73,14 +74,23 @@ public class DorisContainer implements ContainerService {
                         .withLogConsumer(
                                 new Slf4jLogConsumer(
                                         
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
-                        .withExposedPorts(8030, 9030, 8040, 9060);
+                        // use customer conf
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("docker/doris/be.conf"),
+                                "/opt/apache-doris/be/conf/be.conf")
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("docker/doris/fe.conf"),
+                                "/opt/apache-doris/fe/conf/fe.conf")
+                        .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610);
 
         container.setPortBindings(
                 Lists.newArrayList(
                         String.format("%s:%s", "8030", "8030"),
                         String.format("%s:%s", "9030", "9030"),
                         String.format("%s:%s", "9060", "9060"),
-                        String.format("%s:%s", "8040", "8040")));
+                        String.format("%s:%s", "8040", "8040"),
+                        String.format("%s:%s", "9611", "9611"),
+                        String.format("%s:%s", "9610", "9610")));
         return container;
     }
 
@@ -90,6 +100,7 @@ public class DorisContainer implements ContainerService {
             // singleton doris container
             dorisContainer.start();
             initializeJdbcConnection();
+            initializeVariables();
             printClusterStatus();
         } catch (Exception ex) {
             LOG.error("Failed to start containers doris", ex);
@@ -98,6 +109,15 @@ public class DorisContainer implements ContainerService {
         LOG.info("Doris container started successfully.");
     }
 
+    @Override
+    public void restartContainer() {
+        LOG.info("Restart doris container.");
+        dorisContainer
+                .getDockerClient()
+                .restartContainerCmd(dorisContainer.getContainerId())
+                .exec();
+    }
+
     @Override
     public boolean isRunning() {
         return dorisContainer.isRunning();
@@ -115,6 +135,16 @@ public class DorisContainer implements ContainerService {
         }
     }
 
+    private void initializeVariables() throws Exception {
+        try (Connection connection = getQueryConnection();
+                Statement statement = connection.createStatement()) {
+            LOG.info("init doris cluster variables.");
+            // avoid arrow flight sql reading bug
+            statement.executeQuery("SET PROPERTY FOR 'root' 
'max_user_connections' = '1024';");
+        }
+        LOG.info("Init variables successfully.");
+    }
+
     @Override
     public String getJdbcUrl() {
         return String.format(JDBC_URL, dorisContainer.getHost());
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
index 6d569bcf..8ecb87da 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.lookup;
 
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
@@ -30,7 +31,10 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.doris.flink.container.AbstractITCaseService;
 import org.apache.doris.flink.container.ContainerUtils;
 import org.apache.doris.flink.table.DorisConfigOptions;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,17 +42,36 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+@RunWith(Parameterized.class)
 public class DorisLookupTableITCase extends AbstractITCaseService {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisLookupTableITCase.class);
     private static final String DATABASE = "test_lookup";
     private static final String TABLE_READ_TBL = "tbl_read_tbl";
+    private static final String TABLE_DIM_TBL = "tbl_dim_tbl";
+
+    private StreamExecutionEnvironment env;
+
+    private final boolean async;
+
+    public DorisLookupTableITCase(boolean async) {
+        this.async = async;
+    }
+
+    @Parameterized.Parameters(name = "async: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
+    @Before
+    public void before() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+    }
 
     @Test
     public void testLookupTable() throws Exception {
         initializeTable();
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(DEFAULT_PARALLELISM);
         DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3, 4);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
         Schema schema =
@@ -76,13 +99,15 @@ public class DorisLookupTableITCase extends 
AbstractITCaseService {
                                 + "'table.identifier' = '%s',"
                                 + "'username' = '%s',"
                                 + "'password' = '%s',"
-                                + "'lookup.cache.max-rows' = '100'"
+                                + "'lookup.cache.max-rows' = '100',"
+                                + "'lookup.jdbc.async' = '%s'"
                                 + ")",
                         getFenodes(),
                         getDorisQueryUrl(),
                         DATABASE + "." + TABLE_READ_TBL,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        async);
         tEnv.executeSql(lookupDDL);
         TableResult tableResult =
                 tEnv.executeSql(
@@ -138,4 +163,205 @@ public class DorisLookupTableITCase extends 
AbstractITCaseService {
                         "insert into %s.%s  values 
(3,-106,-14878,1466614815449373200)",
                         DATABASE, DorisLookupTableITCase.TABLE_READ_TBL));
     }
+
+    @Test
+    public void testLookup() throws Exception {
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        initFlinkTable(tEnv);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .columnByExpression("proctime", "PROCTIME()")
+                        .build();
+
+        Table table = tEnv.fromDataStream(env.fromElements(1, 2, 3), schema);
+        tEnv.createTemporaryView("fact_table", table);
+
+        String query =
+                "select fact_table.f0,"
+                        + "doris_lookup.score"
+                        + " from fact_table"
+                        + " left join doris_lookup FOR SYSTEM_TIME AS OF 
fact_table.proctime on fact_table.f0 = doris_lookup.id";
+        TableResult tableResult = tEnv.executeSql(query);
+        CloseableIterator<Row> collectIter = tableResult.collect();
+        List<String> actual = collectSize(collectIter, 3);
+        String[] expected = new String[] {"+I[1, 100]", "+I[2, 200]", "+I[3, 
null]"};
+        assertEqualsInAnyOrder(Arrays.asList(expected), 
Arrays.asList(actual.toArray()));
+        collectIter.close();
+
+        // mock data
+        tEnv.dropTemporaryView("fact_table");
+        tEnv.createTemporaryView(
+                "fact_table", tEnv.fromDataStream(env.fromElements(1, 2, 3, 
4), schema));
+
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format(
+                        "insert into %s.%s  values (3,300)",
+                        DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL));
+
+        TableResult tableResult2 = tEnv.executeSql(query);
+        CloseableIterator<Row> collectIter2 = tableResult2.collect();
+        actual = collectSize(collectIter2, 4);
+        expected = new String[] {"+I[1, 100]", "+I[2, 200]", "+I[3, 300]", 
"+I[4, null]"};
+        assertEqualsInAnyOrder(Arrays.asList(expected), 
Arrays.asList(actual.toArray()));
+        collectIter2.close();
+    }
+
+    @Test
+    public void testLookupCache() throws Exception {
+        // Asynchronous data may be found before the cache expires, and may be 
out of order.
+        if (async) {
+            return;
+        }
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .columnByExpression("proctime", "PROCTIME()")
+                        .build();
+
+        DataStreamSource<Integer> mockSource =
+                env.addSource(
+                        new SourceFunction<Integer>() {
+
+                            @Override
+                            public void run(SourceContext<Integer> ctx) throws 
Exception {
+                                ctx.collect(1);
+                                ctx.collect(2);
+                                // wait change table value to verify cache
+                                Thread.sleep(10000);
+                                ctx.collect(3);
+
+                                // query from cache
+                                ctx.collect(2);
+                                ctx.collect(1);
+
+                                // wait cache expire for max-rows
+                                ctx.collect(4);
+                                ctx.collect(5);
+                                ctx.collect(6);
+
+                                // query from table get new value
+                                ctx.collect(2);
+                                ctx.collect(1);
+                            }
+
+                            @Override
+                            public void cancel() {}
+                        });
+
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        initFlinkTable(tEnv);
+        Table table = tEnv.fromDataStream(mockSource, schema);
+        tEnv.createTemporaryView("fact_table", table);
+
+        String query =
+                "select fact_table.f0,"
+                        + "doris_lookup.score"
+                        + " from fact_table"
+                        + " left join doris_lookup FOR SYSTEM_TIME AS OF 
fact_table.proctime on fact_table.f0 = doris_lookup.id";
+        TableResult tableResult = tEnv.executeSql(query);
+        CloseableIterator<Row> collectIter = tableResult.collect();
+
+        List<String> actual = new ArrayList<>();
+        int index = 0;
+        while (actual.size() < 10 && collectIter.hasNext()) {
+            String row = collectIter.next().toString();
+            if ("+I[2, 200]".equals(row) && index == 1) {
+                LOG.info(
+                        "update table values {}.{}",
+                        DATABASE,
+                        DorisLookupTableITCase.TABLE_DIM_TBL);
+                ContainerUtils.executeSQLStatement(
+                        getDorisQueryConnection(),
+                        LOG,
+                        String.format(
+                                "insert into %s.%s  values (1,1111),(2,2222)",
+                                DATABASE, 
DorisLookupTableITCase.TABLE_DIM_TBL));
+            }
+            actual.add(row);
+            index++;
+        }
+
+        String[] expected =
+                new String[] {
+                    "+I[1, 100]",
+                    "+I[2, 200]",
+                    "+I[3, null]",
+                    "+I[2, 200]",
+                    "+I[1, 100]",
+                    "+I[4, null]",
+                    "+I[5, null]",
+                    "+I[6, null]",
+                    "+I[2, 2222]",
+                    "+I[1, 1111]"
+                };
+        assertEqualsInAnyOrder(Arrays.asList(expected), 
Arrays.asList(actual.toArray()));
+        collectIter.close();
+    }
+
+    private static List<String> collectSize(CloseableIterator<Row> iterator, 
int rows)
+            throws Exception {
+        List<String> result = new ArrayList<>();
+        while (result.size() < rows && iterator.hasNext()) {
+            result.add(iterator.next().toString());
+        }
+        return result;
+    }
+
+    private void initFlinkTable(StreamTableEnvironment tEnv) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format(
+                        "DROP TABLE IF EXISTS %s.%s",
+                        DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` int(11),\n"
+                                + "`score` int(11)\n"
+                                + ") UNIQUE KEY(`id`) DISTRIBUTED BY 
HASH(`id`) BUCKETS 4\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL),
+                String.format(
+                        "insert into %s.%s  values (1,100)",
+                        DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL),
+                String.format(
+                        "insert into %s.%s  values (2,200)",
+                        DATABASE, DorisLookupTableITCase.TABLE_DIM_TBL));
+
+        String lookupDDL =
+                String.format(
+                        "CREATE TABLE `doris_lookup`("
+                                + "  `id` INTEGER,"
+                                + "  `score` INTEGER,"
+                                + "  PRIMARY KEY (`id`) NOT ENFORCED"
+                                + ")  WITH ("
+                                + "'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + "'fenodes' = '%s',"
+                                + "'jdbc-url' = '%s',"
+                                + "'table.identifier' = '%s',"
+                                + "'username' = '%s',"
+                                + "'password' = '%s',"
+                                + "'lookup.jdbc.async' = '%s',"
+                                + "'lookup.cache.ttl' = '10m',"
+                                + "'lookup.cache.max-rows' = '3'"
+                                + ")",
+                        getFenodes(),
+                        getDorisQueryUrl(),
+                        DATABASE + "." + TABLE_DIM_TBL,
+                        getDorisUsername(),
+                        getDorisPassword(),
+                        async);
+
+        tEnv.executeSql(lookupDDL);
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
new file mode 100644
index 00000000..95156d38
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkFailoverITCase.java
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
+import org.apache.doris.flink.utils.MockSource;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/** DorisSink ITCase failover case */
+@RunWith(Parameterized.class)
+public class DorisSinkFailoverITCase extends AbstractITCaseService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisSinkFailoverITCase.class);
+    static final String DATABASE = "test_failover_sink";
+    static final String TABLE_JSON_TBL_RESTART_DORIS = 
"tbl_json_tbl_restart_doris";
+    static final String TABLE_JSON_TBL_LOAD_FAILURE = 
"tbl_json_tbl_load_failure";
+    static final String TABLE_JSON_TBL_CKPT_FAILURE = 
"tbl_json_tbl_ckpt_failure";
+
+    private final boolean batchMode;
+
+    public DorisSinkFailoverITCase(boolean batchMode) {
+        this.batchMode = batchMode;
+    }
+
+    @Parameterized.Parameters(name = "batchMode: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
+    /** test doris cluster failover */
+    @Test
+    public void testDorisClusterFailoverSink() throws Exception {
+        LOG.info("start to test testDorisClusterFailoverSink.");
+        makeFailoverTest(TABLE_JSON_TBL_RESTART_DORIS, 
FaultType.RESTART_FAILURE, 40);
+    }
+
+    /** mock precommit failure */
+    @Test
+    public void testStreamLoadFailoverSink() throws Exception {
+        LOG.info("start to test testStreamLoadFailoverSink.");
+        makeFailoverTest(TABLE_JSON_TBL_LOAD_FAILURE, 
FaultType.STREAM_LOAD_FAILURE, 20);
+    }
+
+    /** mock checkpoint failure when precommit or streamload successful */
+    @Test
+    public void testCheckpointFailoverSink() throws Exception {
+        LOG.info("start to test testCheckpointFailoverSink.");
+        makeFailoverTest(TABLE_JSON_TBL_CKPT_FAILURE, 
FaultType.CHECKPOINT_FAILURE, 20);
+    }
+
+    public void makeFailoverTest(String tableName, FaultType faultType, int 
totalRecords)
+            throws Exception {
+        initializeTable(tableName);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        int checkpointInterval = 5000;
+        env.enableCheckpointing(checkpointInterval);
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("format", "csv");
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .enable2PC()
+                .setBatchMode(batchMode)
+                .setFlushQueueSize(4)
+                .setStreamLoadProp(properties);
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder
+                .setFenodes(getFenodes())
+                .setTableIdentifier(DATABASE + "." + tableName)
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
+
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        int triggerCkptError = -1;
+        if (FaultType.CHECKPOINT_FAILURE.equals(faultType)) {
+            triggerCkptError = 7;
+        }
+        DataStreamSource<String> mockSource =
+                env.addSource(new MockSource(totalRecords, triggerCkptError));
+        mockSource.sinkTo(builder.build());
+        JobClient jobClient = env.executeAsync();
+        CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus();
+        LOG.info("Job status: {}", jobStatus);
+
+        String query = String.format("select * from %s", DATABASE + "." + 
tableName);
+        List<String> result;
+        int maxRestart = 5;
+        Random random = new Random();
+        while (true) {
+            result = 
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, query, 2);
+
+            if (result.size() >= totalRecords * DEFAULT_PARALLELISM
+                    && 
getFlinkJobStatus(jobClient).equals(JobStatus.FINISHED)) {
+                // Batch mode can only achieve at least once
+                break;
+            }
+
+            // Wait until write is successful, then trigger error
+            if (result.size() > 1 && maxRestart-- >= 0) {
+                // trigger error random
+                int randomSleepMs = random.nextInt(30);
+                if (FaultType.STREAM_LOAD_FAILURE.equals(faultType)) {
+                    faultInjectionOpen();
+                    randomSleepMs = randomSleepMs + 20;
+                    LOG.info("Injecting fault, sleep {}s before recover", 
randomSleepMs);
+                    Thread.sleep(randomSleepMs * 1000);
+                    faultInjectionClear();
+                } else if (FaultType.RESTART_FAILURE.equals(faultType)) {
+                    // docker image restart time is about 60s
+                    randomSleepMs = randomSleepMs + 60;
+                    dorisContainerService.restartContainer();
+                    LOG.info(
+                            "Restarting doris cluster, sleep {}s before next 
restart",
+                            randomSleepMs);
+                    Thread.sleep(randomSleepMs * 1000);
+                }
+            } else {
+                // Avoid frequent queries
+                Thread.sleep(checkpointInterval);
+            }
+        }
+
+        // concat expect value
+        List<String> expected = new ArrayList<>();
+        for (int i = 1; i <= totalRecords; i++) {
+            for (int j = 0; j < DEFAULT_PARALLELISM; j++) {
+                expected.add(i + "," + j);
+            }
+        }
+        if (!batchMode) {
+            ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
expected, query, 2, false);
+        } else {
+            List<String> actualResult =
+                    ContainerUtils.getResult(getDorisQueryConnection(), LOG, 
expected, query, 2);
+            Assert.assertTrue(
+                    actualResult.size() >= expected.size() && 
actualResult.containsAll(expected));
+        }
+    }
+
+    public void faultInjectionOpen() throws IOException {
+        String pointName = "FlushToken.submit_flush_error";
+        String apiUrl =
+                String.format(
+                        "http://%s/api/debug_point/add/%s";,
+                        dorisContainerService.getBenodes(), pointName);
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(
+                HttpHeaders.AUTHORIZATION,
+                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    public void faultInjectionClear() throws IOException {
+        String apiUrl =
+                String.format(
+                        "http://%s/api/debug_point/clear";, 
dorisContainerService.getBenodes());
+        HttpPost httpPost = new HttpPost(apiUrl);
+        httpPost.addHeader(
+                HttpHeaders.AUTHORIZATION,
+                auth(dorisContainerService.getUsername(), 
dorisContainerService.getPassword()));
+        try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+            try (CloseableHttpResponse response = 
httpClient.execute(httpPost)) {
+                int statusCode = response.getStatusLine().getStatusCode();
+                String reason = response.getStatusLine().toString();
+                if (statusCode == 200 && response.getEntity() != null) {
+                    LOG.info("Debug point response {}", 
EntityUtils.toString(response.getEntity()));
+                } else {
+                    LOG.info("Debug point failed, statusCode: {}, reason: {}", 
statusCode, reason);
+                }
+            }
+        }
+    }
+
+    private String auth(String user, String password) {
+        final String authInfo = user + ":" + password;
+        byte[] encoded = 
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded);
+    }
+
+    private void initializeTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` int,\n"
+                                + "`task_id` int\n"
+                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table));
+    }
+
+    enum FaultType {
+        RESTART_FAILURE,
+        STREAM_LOAD_FAILURE,
+        CHECKPOINT_FAILURE
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index dd74bb1c..0ebc48fc 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -41,8 +42,11 @@ import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.doris.flink.table.DorisConfigOptions;
 import org.apache.doris.flink.utils.MockSource;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,12 +58,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.api.common.JobStatus.FINISHED;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 
 /** DorisSink ITCase with csv and arrow format. */
+@RunWith(Parameterized.class)
 public class DorisSinkITCase extends AbstractITCaseService {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSinkITCase.class);
     static final String DATABASE = "test_sink";
@@ -75,6 +79,17 @@ public class DorisSinkITCase extends AbstractITCaseService {
     static final String TABLE_CSV_JM = "tbl_csv_jm";
     static final String TABLE_CSV_TM = "tbl_csv_tm";
 
+    private final boolean batchMode;
+
+    public DorisSinkITCase(boolean batchMode) {
+        this.batchMode = batchMode;
+    }
+
+    @Parameterized.Parameters(name = "batchMode: {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {false}, new Object[] {true}};
+    }
+
     @Rule
     public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
@@ -87,13 +102,19 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
 
     @Test
     public void testSinkCsvFormat() throws Exception {
-        initializeTable(TABLE_CSV);
+        initializeTable(TABLE_CSV, DataModel.UNIQUE);
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
         properties.setProperty("format", "csv");
         DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
-        
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .setStreamLoadProp(properties)
+                .setDeletable(false)
+                .setBufferCount(4)
+                .setBufferSize(5 * 1024 * 1024)
+                .setBatchMode(batchMode);
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder
                 .setFenodes(getFenodes())
@@ -110,7 +131,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
 
     @Test
     public void testSinkJsonFormat() throws Exception {
-        initializeTable(TABLE_JSON);
+        initializeTable(TABLE_JSON, DataModel.UNIQUE);
         Properties properties = new Properties();
         properties.setProperty("read_json_by_line", "true");
         properties.setProperty("format", "json");
@@ -124,7 +145,12 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
         row2.put("age", 2);
 
         DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
-        
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .setBatchMode(batchMode)
+                .setStreamLoadProp(properties)
+                // uniq need to be false
+                .setDeletable(false);
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder
                 .setFenodes(getFenodes())
@@ -166,7 +192,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
 
     @Test
     public void testTableSinkJsonFormat() throws Exception {
-        initializeTable(TABLE_JSON_TBL);
+        initializeTable(TABLE_JSON_TBL, DataModel.DUPLICATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -191,6 +217,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + " 'sink.enable-2pc' = 'true',"
                                 + " 'sink.use-cache' = 'true',"
                                 + " 'sink.enable-delete' = 'false',"
+                                + " 'sink.enable.batch-mode' = '%s',"
                                 + " 'sink.ignore.update-before' = 'true',"
                                 + " 'sink.properties.format' = 'json',"
                                 + " 'sink.properties.read_json_by_line' = 
'true',"
@@ -201,7 +228,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
                         getFenodes(),
                         DATABASE + "." + TABLE_JSON_TBL,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        batchMode);
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all 
SELECT 'flink',2");
 
@@ -218,7 +246,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
             LOG.info("benodes is empty, skip the test.");
             return;
         }
-        initializeTable(TABLE_TBL_AUTO_REDIRECT);
+        initializeTable(TABLE_TBL_AUTO_REDIRECT, DataModel.AGGREGATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -239,6 +267,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s',"
+                                + " 'sink.enable.batch-mode' = '%s',"
                                 + " 'sink.label-prefix' = 'doris_sink"
                                 + UUID.randomUUID()
                                 + "'"
@@ -247,7 +276,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
                         getBenodes(),
                         DATABASE + "." + TABLE_TBL_AUTO_REDIRECT,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        batchMode);
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all 
SELECT 'flink',2");
 
@@ -260,7 +290,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
 
     @Test
     public void testTableBatch() throws Exception {
-        initializeTable(TABLE_CSV_BATCH_TBL);
+        initializeTable(TABLE_CSV_BATCH_TBL, DataModel.UNIQUE_MOR);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -285,7 +315,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + " 'sink.properties.column_separator' = 
'\\x01',"
                                 + " 'sink.properties.line_delimiter' = 
'\\x02',"
                                 + " 'sink.ignore.update-before' = 'false',"
-                                + " 'sink.enable.batch-mode' = 'true',"
+                                + " 'sink.enable.batch-mode' = '%s',"
                                 + " 'sink.enable-delete' = 'true',"
                                 + " 'sink.flush.queue-size' = '2',"
                                 + " 'sink.buffer-flush.max-rows' = '10000',"
@@ -295,7 +325,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
                         getFenodes(),
                         DATABASE + "." + TABLE_CSV_BATCH_TBL,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        batchMode);
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql("INSERT INTO doris_sink_batch SELECT 'doris',1 union 
all SELECT 'flink',2");
 
@@ -309,7 +340,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
 
     @Test
     public void testDataStreamBatch() throws Exception {
-        initializeTable(TABLE_CSV_BATCH_DS);
+        initializeTable(TABLE_CSV_BATCH_DS, DataModel.AGGREGATE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setParallelism(DEFAULT_PARALLELISM);
@@ -328,6 +359,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
         DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
         executionBuilder
                 .setLabelPrefix(UUID.randomUUID().toString())
+                .setFlushQueueSize(3)
+                .setBatchMode(batchMode)
                 .setStreamLoadProp(properties)
                 .setBufferFlushMaxBytes(10485760)
                 .setBufferFlushMaxRows(10000)
@@ -350,7 +383,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
 
     @Test
     public void testTableGroupCommit() throws Exception {
-        initializeTable(TABLE_GROUP_COMMIT);
+        initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -376,7 +409,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + " 'sink.properties.line_delimiter' = 
'\\x02',"
                                 + " 'sink.properties.group_commit' = 
'sync_mode',"
                                 + " 'sink.ignore.update-before' = 'false',"
-                                + " 'sink.enable.batch-mode' = 'true',"
+                                + " 'sink.enable.batch-mode' = '%s',"
                                 + " 'sink.enable-delete' = 'true',"
                                 + " 'sink.flush.queue-size' = '2',"
                                 + " 'sink.buffer-flush.max-rows' = '10000',"
@@ -386,7 +419,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
                         getFenodes(),
                         DATABASE + "." + TABLE_GROUP_COMMIT,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        batchMode);
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql(
                 "INSERT INTO doris_group_commit_sink SELECT 'doris',1 union 
all  SELECT 'group_commit',2 union all  SELECT 'flink',3");
@@ -401,7 +435,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
 
     @Test
     public void testTableGzFormat() throws Exception {
-        initializeTable(TABLE_GZ_FORMAT);
+        initializeTable(TABLE_GZ_FORMAT, DataModel.UNIQUE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -420,6 +454,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s',"
+                                + " 'sink.enable.batch-mode' = '%s',"
+                                + " 'sink.enable-delete' = 'false',"
                                 + " 'sink.label-prefix' = '"
                                 + UUID.randomUUID()
                                 + "',"
@@ -430,7 +466,8 @@ public class DorisSinkITCase extends AbstractITCaseService {
                         getFenodes(),
                         DATABASE + "." + TABLE_GZ_FORMAT,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        batchMode);
         tEnv.executeSql(sinkDDL);
         tEnv.executeSql(
                 "INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all  
SELECT 'flink',2");
@@ -445,7 +482,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     @Test
     public void testJobManagerFailoverSink() throws Exception {
         LOG.info("start to test JobManagerFailoverSink.");
-        initializeFailoverTable(TABLE_CSV_JM);
+        initializeFailoverTable(TABLE_CSV_JM, DataModel.DUPLICATE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.enableCheckpointing(10000);
@@ -468,6 +505,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
         executionBuilder
                 .setLabelPrefix(UUID.randomUUID().toString())
                 .setStreamLoadProp(properties)
+                .setBatchMode(batchMode)
                 .setUseCache(true);
 
         builder.setDorisReadOptions(readOptionBuilder.build())
@@ -499,13 +537,17 @@ public class DorisSinkITCase extends 
AbstractITCaseService {
                 Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", 
"4,1", "5,0", "5,1");
         String query =
                 String.format("select id,task_id from %s.%s order by 1,2", 
DATABASE, TABLE_CSV_JM);
-        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
+
+        List<String> actualResult =
+                ContainerUtils.getResult(getDorisQueryConnection(), LOG, 
expected, query, 2);
+        Assert.assertTrue(
+                actualResult.size() >= expected.size() && 
actualResult.containsAll(expected));
     }
 
     @Test
     public void testTaskManagerFailoverSink() throws Exception {
         LOG.info("start to test TaskManagerFailoverSink.");
-        initializeFailoverTable(TABLE_CSV_TM);
+        initializeFailoverTable(TABLE_CSV_TM, DataModel.DUPLICATE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.enableCheckpointing(10000);
@@ -525,7 +567,10 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
         properties.setProperty("format", "csv");
-        
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .setBatchMode(batchMode)
+                .setStreamLoadProp(properties);
 
         builder.setDorisReadOptions(readOptionBuilder.build())
                 .setDorisExecutionOptions(executionBuilder.build())
@@ -556,12 +601,17 @@ public class DorisSinkITCase extends 
AbstractITCaseService {
                 Arrays.asList("1,0", "1,1", "2,0", "2,1", "3,0", "3,1", "4,0", 
"4,1", "5,0", "5,1");
         String query =
                 String.format("select id,task_id from %s.%s order by 1,2", 
DATABASE, TABLE_CSV_TM);
-        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
+
+        List<String> actualResult =
+                ContainerUtils.getResult(getDorisQueryConnection(), LOG, 
expected, query, 2);
+        Assert.assertTrue(
+                actualResult.size() >= expected.size() && 
actualResult.containsAll(expected));
     }
 
     @Test
     public void testTableOverwrite() throws Exception {
-        initializeTable(TABLE_OVERWRITE);
+        LOG.info("start to test testTableOverwrite.");
+        initializeTable(TABLE_OVERWRITE, DataModel.AGGREGATE);
         // mock data
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
@@ -593,6 +643,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
                                 + " 'jdbc-url' = '%s',"
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s',"
+                                + " 'sink.enable.batch-mode' = '%s',"
                                 + " 'sink.label-prefix' = '"
                                 + UUID.randomUUID()
                                 + "'"
@@ -601,18 +652,33 @@ public class DorisSinkITCase extends 
AbstractITCaseService {
                         DATABASE + "." + TABLE_OVERWRITE,
                         getDorisQueryUrl(),
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        batchMode);
         tEnv.executeSql(sinkDDL);
         TableResult tableResult =
                 tEnv.executeSql(
                         "INSERT OVERWRITE doris_overwrite_sink SELECT 
'doris',1 union all  SELECT 'overwrite',2 union all  SELECT 'flink',3");
 
-        tableResult.await(25000, TimeUnit.MILLISECONDS);
+        JobClient jobClient = tableResult.getJobClient().get();
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(120)));
+
         List<String> expected = Arrays.asList("doris,1", "flink,3", 
"overwrite,2");
-        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2, false);
     }
 
-    private void initializeTable(String table) {
+    private void initializeTable(String table, DataModel dataModel) {
+        String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "";
+        String morProps =
+                !DataModel.UNIQUE_MOR.equals(dataModel)
+                        ? ""
+                        : ",\"enable_unique_key_merge_on_write\" = \"false\"";
+        String model =
+                dataModel.equals(DataModel.UNIQUE_MOR)
+                        ? DataModel.UNIQUE.toString()
+                        : dataModel.toString();
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
                 LOG,
@@ -621,15 +687,30 @@ public class DorisSinkITCase extends 
AbstractITCaseService {
                 String.format(
                         "CREATE TABLE %s.%s ( \n"
                                 + "`name` varchar(256),\n"
-                                + "`age` int\n"
-                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
-                                + "PROPERTIES (\n"
+                                + "`age` int %s\n"
+                                + ") "
+                                + " %s KEY(`name`) "
+                                + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+                                + "PROPERTIES ("
                                 + "\"replication_num\" = \"1\"\n"
-                                + ")\n",
-                        DATABASE, table));
+                                + morProps
+                                + ")",
+                        DATABASE,
+                        table,
+                        max,
+                        model));
     }
 
-    private void initializeFailoverTable(String table) {
+    private void initializeFailoverTable(String table, DataModel dataModel) {
+        String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "";
+        String morProps =
+                !DataModel.UNIQUE_MOR.equals(dataModel)
+                        ? ""
+                        : ",\"enable_unique_key_merge_on_write\" = \"false\"";
+        String model =
+                dataModel.equals(DataModel.UNIQUE_MOR)
+                        ? DataModel.UNIQUE.toString()
+                        : dataModel.toString();
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
                 LOG,
@@ -638,11 +719,17 @@ public class DorisSinkITCase extends 
AbstractITCaseService {
                 String.format(
                         "CREATE TABLE %s.%s ( \n"
                                 + "`id` int,\n"
-                                + "`task_id` int\n"
-                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+                                + "`task_id` int %s\n"
+                                + ") "
+                                + " %s KEY(`id`) "
+                                + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
+                                + morProps
                                 + ")\n",
-                        DATABASE, table));
+                        DATABASE,
+                        table,
+                        max,
+                        model));
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 3eb96597..1421c953 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -31,7 +31,9 @@ import 
org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
+import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.cfg.DorisStreamOptions;
 import org.apache.doris.flink.container.AbstractITCaseService;
 import org.apache.doris.flink.container.ContainerUtils;
@@ -41,6 +43,8 @@ import org.apache.doris.flink.table.DorisConfigOptions;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +57,7 @@ import java.util.List;
 import java.util.Properties;
 
 /** DorisSource ITCase. */
+@RunWith(Parameterized.class)
 public class DorisSourceITCase extends AbstractITCaseService {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSourceITCase.class);
     private static final String DATABASE = "test_source";
@@ -73,6 +78,24 @@ public class DorisSourceITCase extends AbstractITCaseService 
{
     private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY =
             "tbl_read_tbl_push_down_with_filter_query";
 
+    private final boolean useFlightSql;
+    private final int flightSqlPort;
+
+    public DorisSourceITCase(boolean useFlightSql, int flightSqlPort) {
+        this.useFlightSql = useFlightSql;
+        this.flightSqlPort = flightSqlPort;
+    }
+
+    @Parameterized.Parameters(name = "useFlightSql: {0}, flightSqlPort: {1}")
+    public static Object[] parameters() {
+        return new Object[][] {
+            new Object[] {
+                false, -1,
+            },
+            new Object[] {true, 9611}
+        };
+    }
+
     @Rule
     public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
@@ -85,7 +108,7 @@ public class DorisSourceITCase extends AbstractITCaseService 
{
 
     @Test
     public void testSource() throws Exception {
-        initializeTable(TABLE_READ);
+        initializeTable(TABLE_READ, DataModel.AGGREGATE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setParallelism(DEFAULT_PARALLELISM);
@@ -99,8 +122,14 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
         DorisSource<List<?>> source =
                 DorisSource.<List<?>>builder()
                         .setDorisOptions(dorisBuilder.build())
+                        .setDorisReadOptions(
+                                DorisReadOptions.builder()
+                                        .setFlightSqlPort(flightSqlPort)
+                                        .setUseFlightSql(useFlightSql)
+                                        .build())
                         .setDeserializer(new SimpleListDeserializationSchema())
                         .build();
+
         List<String> actual = new ArrayList<>();
         try (CloseableIterator<List<?>> iterator =
                 env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Doris Source")
@@ -115,7 +144,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
 
     @Test
     public void testOldSourceApi() throws Exception {
-        initializeTable(TABLE_READ_OLD_API);
+        initializeTable(TABLE_READ_OLD_API, DataModel.UNIQUE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         Properties properties = new Properties();
@@ -141,7 +170,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
 
     @Test
     public void testTableSource() throws Exception {
-        initializeTable(TABLE_READ_TBL);
+        initializeTable(TABLE_READ_TBL, DataModel.DUPLICATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -159,12 +188,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("SELECT * FROM 
doris_source");
 
@@ -192,7 +225,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
 
     @Test
     public void testTableSourceOldApi() throws Exception {
-        initializeTable(TABLE_READ_TBL_OLD_API);
+        initializeTable(TABLE_READ_TBL_OLD_API, DataModel.AGGREGATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -231,7 +264,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
 
     @Test
     public void testTableSourceAllOptions() throws Exception {
-        initializeTable(TABLE_READ_TBL_ALL_OPTIONS);
+        initializeTable(TABLE_READ_TBL_ALL_OPTIONS, DataModel.DUPLICATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -258,12 +291,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'doris.batch.size' = '1024',"
                                 + " 'doris.exec.mem.limit' = '2048mb',"
                                 + " 'doris.deserialize.arrow.async' = 'true',"
-                                + " 'doris.deserialize.queue.size' = '32'"
+                                + " 'doris.deserialize.queue.size' = '32',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL_ALL_OPTIONS,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("SELECT * FROM 
doris_source_all_options");
 
@@ -279,7 +316,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
 
     @Test
     public void testTableSourceFilterAndProjectionPushDown() throws Exception {
-        initializeTable(TABLE_READ_TBL_PUSH_DOWN);
+        initializeTable(TABLE_READ_TBL_PUSH_DOWN, DataModel.UNIQUE_MOR);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -296,12 +333,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         TableResult tableResult =
                 tEnv.executeSql(
@@ -320,7 +361,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
 
     @Test
     public void testTableSourceTimestampFilterAndProjectionPushDown() throws 
Exception {
-        initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN);
+        initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN, 
DataModel.UNIQUE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
@@ -341,12 +382,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
 
         List<String> actualProjectionResult =
@@ -453,7 +498,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
     @Test
     public void testTableSourceFilterWithUnionAll() throws Exception {
         LOG.info("starting to execute testTableSourceFilterWithUnionAll 
case.");
-        initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
+        initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL, 
DataModel.UNIQUE_MOR);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -470,12 +515,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + 
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         String querySql =
                 "  SELECT * FROM doris_source_filter_with_union_all where age 
= '18'"
@@ -508,13 +557,19 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                 String.format(
                         "CREATE TABLE %s.%s ( \n"
                                 + "`name` varchar(256),\n"
-                                + "`dt` date,\n"
-                                + "`age` int\n"
-                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+                                + "`dt` date %s,\n"
+                                + "`age` int %s\n"
+                                + ") "
+                                + " %s KEY(`name`) "
+                                + "DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
                                 + ")\n",
-                        DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
+                        DATABASE,
+                        TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY,
+                        "",
+                        "",
+                        DataModel.UNIQUE),
                 String.format(
                         "insert into %s.%s  values 
('doris',date_sub(now(),INTERVAL 7 DAY), 18)",
                         DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
@@ -543,12 +598,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
                                 + " 'password' = '%s',"
-                                + " 'doris.filter.query' = ' (dt = 
DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) '"
+                                + " 'doris.filter.query' = ' (dt = 
DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) ',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + 
TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         String querySql =
                 "  SELECT * FROM doris_source_filter_with_filter_query where 
name = 'doris' and age > 2";
@@ -571,7 +630,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
     @Test
     public void testTableSourceFilterWithUnionAllNotEqualFilter() throws 
Exception {
         LOG.info("starting to execute 
testTableSourceFilterWithUnionAllNotEqualFilter case.");
-        initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER);
+        initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER, 
DataModel.AGGREGATE);
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -588,12 +647,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + 
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         String querySql =
                 "  SELECT * FROM doris_source_filter_with_union_all where name 
= 'doris'"
@@ -616,7 +679,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
     @Test
     public void testJobManagerFailoverSource() throws Exception {
         LOG.info("start to test JobManagerFailoverSource.");
-        initializeTableWithData(TABLE_CSV_JM);
+        initializeTableWithData(TABLE_CSV_JM, DataModel.DUPLICATE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.enableCheckpointing(200L);
@@ -634,12 +697,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_CSV_JM,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("select * from 
doris_source_jm");
         CloseableIterator<Row> iterator = tableResult.collect();
@@ -697,7 +764,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
     @Test
     public void testTaskManagerFailoverSource() throws Exception {
         LOG.info("start to test TaskManagerFailoverSource.");
-        initializeTableWithData(TABLE_CSV_TM);
+        initializeTableWithData(TABLE_CSV_TM, DataModel.UNIQUE);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(DEFAULT_PARALLELISM);
         env.enableCheckpointing(200L);
@@ -715,12 +782,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                                 + " 'fenodes' = '%s',"
                                 + " 'table.identifier' = '%s',"
                                 + " 'username' = '%s',"
-                                + " 'password' = '%s'"
+                                + " 'password' = '%s',"
+                                + " 'source.use-flight-sql' = '%s',"
+                                + " 'source.flight-sql-port' = '%s'"
                                 + ")",
                         getFenodes(),
                         DATABASE + "." + TABLE_CSV_TM,
                         getDorisUsername(),
-                        getDorisPassword());
+                        getDorisPassword(),
+                        useFlightSql,
+                        flightSqlPort);
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("select * from 
doris_source_tm");
         CloseableIterator<Row> iterator = tableResult.collect();
@@ -759,7 +830,15 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
         assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual));
     }
 
-    private void initializeTable(String table) {
+    private void initializeTable(String table, DataModel dataModel) {
+        String morProps =
+                !DataModel.UNIQUE_MOR.equals(dataModel)
+                        ? ""
+                        : ",\"enable_unique_key_merge_on_write\" = \"false\"";
+        String model =
+                dataModel.equals(DataModel.UNIQUE_MOR)
+                        ? DataModel.UNIQUE.toString()
+                        : dataModel.toString();
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
                 LOG,
@@ -768,18 +847,25 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                 String.format(
                         "CREATE TABLE %s.%s ( \n"
                                 + "`name` varchar(256),\n"
-                                + "`age` int\n"
-                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+                                + "`age` int %s\n"
+                                + ") "
+                                + " %s KEY(`name`) "
+                                + " DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
-                                + ")\n",
-                        DATABASE, table),
+                                + morProps
+                                + ")",
+                        DATABASE,
+                        table,
+                        DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "",
+                        model),
                 String.format("insert into %s.%s  values ('doris',18)", 
DATABASE, table),
                 String.format("insert into %s.%s  values ('flink',10)", 
DATABASE, table),
                 String.format("insert into %s.%s  values ('apache',12)", 
DATABASE, table));
     }
 
-    private void initializeTimestampTable(String table) {
+    private void initializeTimestampTable(String table, DataModel dataModel) {
+        String max = DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "";
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
                 LOG,
@@ -789,14 +875,16 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                         "CREATE TABLE %s.%s ( \n"
                                 + "`id` int,\n"
                                 + "`name` varchar(256),\n"
-                                + "`age` int,\n"
-                                + "`birthday` datetime,\n"
-                                + "`brilliant_time` datetime(6),\n"
-                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+                                + "`age` int %s,\n"
+                                + "`birthday` datetime %s,\n"
+                                + "`brilliant_time` datetime(6) %s,\n"
+                                + ") "
+                                + " %s KEY(`id`, `name`) "
+                                + " DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
                                 + ")\n",
-                        DATABASE, table),
+                        DATABASE, table, max, max, max, dataModel.toString()),
                 String.format(
                         "insert into %s.%s  values 
(1,'Kevin',54,'2023-01-01T00:00:00','2023-01-01T00:00:00.000001')",
                         DATABASE, table),
@@ -817,7 +905,7 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                         DATABASE, table));
     }
 
-    private void initializeTableWithData(String table) {
+    private void initializeTableWithData(String table, DataModel dataModel) {
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
                 LOG,
@@ -826,12 +914,17 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                 String.format(
                         "CREATE TABLE %s.%s ( \n"
                                 + "`name` varchar(256),\n"
-                                + "`age` int\n"
-                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+                                + "`age` int %s\n"
+                                + ") "
+                                + " %s KEY(`name`) "
+                                + "DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
                                 + ")\n",
-                        DATABASE, table),
+                        DATABASE,
+                        table,
+                        DataModel.AGGREGATE.equals(dataModel) ? "MAX" : "",
+                        dataModel.toString()),
                 String.format(
                         "insert into %s.%s  values 
('101',1),('102',1),('103',1)", DATABASE, table),
                 String.format(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
index a87e2205..fbbc0863 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/MockSource.java
@@ -40,6 +40,7 @@ public class MockSource extends 
RichParallelSourceFunction<String>
     private transient ListState<Long> state;
     private Long id = 0L;
     private int numEventsTotal;
+    private int failCheckpointId = -1;
     private volatile boolean running = true;
     private volatile long waitNextCheckpoint = 0L;
     private volatile long lastCheckpointConfirmed = 0L;
@@ -48,6 +49,11 @@ public class MockSource extends 
RichParallelSourceFunction<String>
         this.numEventsTotal = numEventsTotal;
     }
 
+    public MockSource(int numEventsTotal, int failCheckpointId) {
+        this.numEventsTotal = numEventsTotal;
+        this.failCheckpointId = failCheckpointId;
+    }
+
     @Override
     public void run(SourceContext<String> ctx) throws Exception {
 
@@ -73,7 +79,11 @@ public class MockSource extends 
RichParallelSourceFunction<String>
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
         state.update(Collections.singletonList(id));
-        LOG.info("snapshot state to {}", id);
+        if (failCheckpointId > 0 && context.getCheckpointId() % 
failCheckpointId == 0) {
+            throw new RuntimeException(
+                    "Trigger fail for testing, checkpointId = " + 
context.getCheckpointId());
+        }
+        LOG.info("snapshot state to {} for checkpoint {}", id, 
context.getCheckpointId());
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
index 20ffb525..b098f58a 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_sink_test_tbl.sql
@@ -17,10 +17,12 @@ CREATE TABLE test_doris2doris_sink.test_tbl (
       `c11` datetime,
       `c12` char(1),
       `c13` varchar(256),
-      `c14` Array<String>,
-      `c15` Map<String, String>,
-      `c16` Struct<name: String, age: int>,
-      `c17` JSON
+      `c14` string,
+      `c15` Array<String>,
+      `c16` Map<String, String>,
+      `c17` Struct<name: String, age: int>,
+      `c18` JSON,
+      `c19` VARIANT
 )
     DUPLICATE KEY(`id`)
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
index 5e57b50b..568a4684 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/doris2doris/test_doris2doris_source_test_tbl.sql
@@ -16,11 +16,13 @@ CREATE TABLE test_doris2doris_source.test_tbl (
      `c10` date,
      `c11` datetime,
      `c12` char(1),
-     `c13` varchar(256),
-     `c14` Array<String>,
-     `c15` Map<String, String>,
-     `c16` Struct<name: String, age: int>,
-     `c17` JSON
+     `c13` varchar(16),
+     `c14` string,
+     `c15` Array<String>,
+     `c16` Map<String, String>,
+     `c17` Struct<name: String, age: int>,
+     `c18` JSON,
+     `c19` JSON -- doris2.1.0 can not read VARIANT
 )
 DUPLICATE KEY(`id`)
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
@@ -29,45 +31,22 @@ PROPERTIES (
 "light_schema_change" = "true"
 );
 
-INSERT INTO test_doris2doris_source.test_tbl
-VALUES
-    (
-        1,
-        TRUE,
-        127,
-        32767,
-        2147483647,
-        9223372036854775807,
-        123456789012345678901234567890,
-        3.14,
-        2.7182818284,
-        12345.6789,
-        '2023-05-22',
-        '2023-05-22 12:34:56',
-        'A',
-        'Example text',
-        ['item1', 'item2', 'item3'],
-        {'key1': 'value1', 'key2': 'value2'},
-        STRUCT('John Doe', 30),
-        '{"key": "value"}'
-    ),
-    (
-        2,
-        FALSE,
-        -128,
-        -32768,
-        -2147483648,
-        -9223372036854775808,
-        -123456789012345678901234567890,
-        -3.14,
-        -2.7182818284,
-        -12345.6789,
-        '2024-01-01',
-        '2024-01-01 00:00:00',
-        'B',
-        'Another example',
-        ['item4', 'item5', 'item6'],
-        {'key3': 'value3', 'key4': 'value4'},
-        STRUCT('Jane Doe', 25),
-        '{"another_key": "another_value"}'
-);
\ No newline at end of file
+INSERT INTO test_doris2doris_source.test_tbl VALUES
+    (1, true, 127, 32767, 2147483647, 9223372036854775807, 
170141183460469231731687303715884105727,
+     3.14, 2.71828, 12345.6789, '2025-03-11', '2025-03-11 12:34:56', 'A', 
'Hello, Doris!', 'This is a string',
+        ['Alice', 'Bob'], {'key1': 'value1', 'key2': 'value2'}, STRUCT('Tom', 
30), '{"key": "value"}', '{"type": "variant", "data": 123}');
+
+INSERT INTO test_doris2doris_source.test_tbl VALUES
+    (2, false, -128, -32768, -2147483648, -9223372036854775808, 
-170141183460469231731687303715884105728,
+     -1.23, 0.0001, -9999.9999, '2024-12-25', '2024-12-25 23:59:59', 'B', 
'Doris Test', 'Another string!',
+        ['Charlie', 'David'], {'k1': 'v1', 'k2': 'v2'}, STRUCT('Jerry', 25), 
'{"status": "ok"}', '{"data": [1, 2, 3]}' );
+
+INSERT INTO test_doris2doris_source.test_tbl VALUES
+    (3, true, 0, 0, 0, 0, 0,
+     0.0, 0.0, 0.0000, '2023-06-15', '2023-06-15 08:00:00', 'C', 'Test Doris', 
'Sample text',
+        ['Eve', 'Frank'], {'alpha': 'beta'}, STRUCT('Alice', 40), '{"nested": 
{"key": "value"}}', '{"variant": "test"}');
+
+INSERT INTO test_doris2doris_source.test_tbl VALUES
+    (4, NULL, NULL, NULL, NULL, NULL, NULL,
+     NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+     NULL, NULL, NULL, NULL, NULL);
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
index d88b2088..90a0eddc 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -2,6 +2,7 @@ mysql-sync-database
     --database test_e2e_mysql
     --mysql-conf database-name=test_e2e_mysql
     --including-tables "tbl.*"
+    --sink-conf sink.ignore.update-before=false
     --table-conf replication_num=1
     --single-sink true
     --ignore-default-value false
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/docker/doris/be.conf 
b/flink-doris-connector/src/test/resources/docker/doris/be.conf
new file mode 100644
index 00000000..94b76e09
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/docker/doris/be.conf
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+PPROF_TMPDIR="$DORIS_HOME/log/"
+
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 9+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 17+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives 
--add-opens=java.base/java.net=ALL-UNNAMED"
+
+# since 1.2, the JAVA_HOME need to be set to run BE process.
+# JAVA_HOME=/path/to/jdk/
+
+# 
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
+# https://jemalloc.net/jemalloc.3.html
+JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false"
+JEMALLOC_PROF_PRFIX=""
+
+# INFO, WARNING, ERROR, FATAL
+sys_log_level = INFO
+
+# ports for admin, web, heartbeat service
+be_port = 9060
+webserver_port = 8040
+heartbeat_service_port = 9050
+brpc_port = 8060
+arrow_flight_sql_port = 9610
+enable_debug_points = true
+
+# HTTPS configures
+enable_https = false
+# path of certificate in PEM format.
+ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
+# path of private key in PEM format.
+ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
+
+
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# data root path, separate by ';'
+# You can specify the storage type for each root path, HDD (cold data) or SSD 
(hot data)
+# eg:
+# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
+# storage_root_path = 
/home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
+# /home/disk2/doris,medium:HDD(default)
+#
+# you also can specify the properties by setting '<property>:<value>', 
separate by ','
+# property 'medium' has a higher priority than the extension of path
+#
+# Default value is ${DORIS_HOME}/storage, you should create it by hand.
+# storage_root_path = ${DORIS_HOME}/storage
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+# Advanced configurations
+# sys_log_dir = ${DORIS_HOME}/log
+# sys_log_roll_mode = SIZE-MB-1024
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = *
+# log_buffer_level = -1
+# palo_cgroups
+
+# aws sdk log level
+#    Off = 0,
+#    Fatal = 1,
+#    Error = 2,
+#    Warn = 3,
+#    Info = 4,
+#    Debug = 5,
+#    Trace = 6
+# Default to turn off aws sdk log, because aws sdk errors that need to be 
cared will be output through Doris logs
+aws_log_level=0
+## If you are not running in aws cloud, you can disable EC2 metadata
+AWS_EC2_METADATA_DISABLED=true
\ No newline at end of file
diff --git a/flink-doris-connector/src/test/resources/docker/doris/fe.conf 
b/flink-doris-connector/src/test/resources/docker/doris/fe.conf
new file mode 100644
index 00000000..a45fb535
--- /dev/null
+++ b/flink-doris-connector/src/test/resources/docker/doris/fe.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#####################################################################
+## The uppercase properties are read and exported by bin/start_fe.sh.
+## To see all Frontend configurations,
+## see fe/src/org/apache/doris/common/Config.java
+#####################################################################
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+# Log dir
+LOG_DIR = ${DORIS_HOME}/log
+
+# For jdk 17, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 
-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR 
-Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M
 --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens 
java.base/jdk.internal.ref=ALL-UNNAMED"
+
+# Set your own JAVA_HOME
+# JAVA_HOME=/path/to/jdk/
+
+##
+## the lowercase properties are read by main program.
+##
+
+# store metadata, must be created before start FE.
+# Default value is ${DORIS_HOME}/doris-meta
+# meta_dir = ${DORIS_HOME}/doris-meta
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+http_port = 8030
+rpc_port = 9020
+query_port = 9030
+edit_log_port = 9010
+arrow_flight_sql_port = 9611
+enable_debug_points = true
+arrow_flight_token_cache_size = 50
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# Advanced configurations
+# log_roll_size_mb = 1024
+# INFO, WARN, ERROR, FATAL
+sys_log_level = INFO
+# NORMAL, BRIEF, ASYNC
+sys_log_mode = ASYNC
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = org.apache.doris
+# audit_log_dir = $LOG_DIR
+# audit_log_modules = slow_query, query
+# audit_log_roll_num = 10
+# meta_delay_toleration_second = 10
+# qe_max_connection = 1024
+# qe_query_timeout_second = 300
+# qe_slow_log_ms = 5000
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to