This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new cdbeabd  [Improve](case) add doris source it case (#296)
cdbeabd is described below

commit cdbeabd1f12d0b0577d32fe64c57b735669f4a94
Author: wudi <676366...@qq.com>
AuthorDate: Wed Mar 26 16:30:24 2025 +0800

    [Improve](case) add doris source it case (#296)
---
 .github/workflows/run-itcase-12.yml                |  51 ---
 .../{run-itcase-20.yml => run-itcase.yml}          |  10 +-
 .licenserc.yaml                                    |  28 +-
 .../spark/client/read/AbstractThriftReader.java    |   3 +-
 .../spark/client/read/DorisFlightSqlReader.java    |  27 +-
 .../client/read/ReaderPartitionGenerator.java      |   8 +-
 .../apache/doris/spark/client/read/RowBatch.java   |  16 +-
 .../spark/exception/DorisRuntimeException.java     |  45 +++
 .../apache/doris/spark/rest/models/DataModel.java  |  25 ++
 .../apache/doris/spark/util/SchemaConvertors.scala |   2 +-
 .../doris/spark/client/read/RowBatchTest.java      |  15 +-
 .../java/org/apache/doris/spark/DorisTestBase.java | 137 --------
 .../spark/container/AbstractContainerTestBase.java | 118 +++++++
 .../doris/spark/container/ContainerUtils.java      | 188 ++++++++++
 .../spark/container/instance/ContainerService.java |  52 +++
 .../spark/container/instance/DorisContainer.java   | 276 +++++++++++++++
 .../container/instance/DorisCustomerContainer.java | 138 ++++++++
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 386 +++++++++++++++++----
 .../apache/doris/spark/sql/DorisWriterITCase.scala | 100 +++---
 .../test/resources/container/ddl/read_all_type.sql |  50 +++
 .../test/resources/container/ddl/read_bitmap.sql   |  17 +
 .../src/test/resources/docker/doris/be.conf        |  99 ++++++
 .../src/test/resources/docker/doris/fe.conf        |  74 ++++
 .../src/test/resources/log4j.properties            |  23 ++
 .../read/expression/V2ExpressionBuilder.scala      |  15 +-
 25 files changed, 1540 insertions(+), 363 deletions(-)

diff --git a/.github/workflows/run-itcase-12.yml 
b/.github/workflows/run-itcase-12.yml
deleted file mode 100644
index fd28357..0000000
--- a/.github/workflows/run-itcase-12.yml
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
----
-name: Run ITCases 1.2
-on:
-  pull_request:
-  push:
-
-jobs:
-  build-extension:
-    name: "Run ITCases 1.2"
-    runs-on: ubuntu-latest
-    defaults:
-      run:
-        shell: bash
-    steps:
-    - name: Checkout
-      uses: actions/checkout@master
-
-    - name: Setup java
-      uses: actions/setup-java@v2
-      with:
-        distribution: adopt
-        java-version: '8'
-
-    - name: Run ITCases for spark 2
-      run: |
-        cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
-    - name: Run ITCases for spark 3.1
-      run: |
-        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
-    - name: Run ITCases for spark 3.3
-      run: |
-        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="adamlee489/doris:1.2.7.1_x86"
diff --git a/.github/workflows/run-itcase-20.yml 
b/.github/workflows/run-itcase.yml
similarity index 87%
rename from .github/workflows/run-itcase-20.yml
rename to .github/workflows/run-itcase.yml
index b0f31c0..10be11e 100644
--- a/.github/workflows/run-itcase-20.yml
+++ b/.github/workflows/run-itcase.yml
@@ -16,14 +16,14 @@
 # under the License.
 #
 ---
-name: Run ITCases 2.0
+name: Run ITCases
 on:
   pull_request:
   push:
 
 jobs:
   build-extension:
-    name: "Run ITCases 2.0"
+    name: "Run ITCases"
     runs-on: ubuntu-latest
     defaults:
       run:
@@ -40,13 +40,13 @@ jobs:
  
     - name: Run ITCases for spark 2
       run: |
-        cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="adamlee489/doris:2.0.3"
+        cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
 
     - name: Run ITCases for spark 3.1
       run: |
-        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="adamlee489/doris:2.0.3"
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
 
     - name: Run ITCases for spark 3.3
       run: |
-        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="adamlee489/doris:2.0.3"
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
     
\ No newline at end of file
diff --git a/.licenserc.yaml b/.licenserc.yaml
index e5af614..4feebdc 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -5,33 +5,15 @@ header:
 
   paths-ignore:
     - 'dist'
-    - 'licenses'
-    - '**/*.md'
-    - 'LICENSE'
+    - 'LICENSE.txt'
+    - 'NOTICE.txt'
     - 'NOTICE'
-    - 'DISCLAIMER'
-    - '.clang-format'
-    - '.clang-format-ignore'
-    - '.gitattributes'
     - '.gitignore'
-    - '.gitmodules'
+    - '.github/PULL_REQUEST_TEMPLATE.md'
     - '.licenserc.yaml'
-    - '.rat-excludes'
-    - 'be/src/common/status.cpp'
-    - 'be/src/common/status.h'
-    - 'be/src/env/env.h'
-    - 'be/src/env/env_posix.cpp'
-    - '**/glibc-compatibility/**'
-    - '**/gutil/**'
-    - '**/test_data/**'
-    - '**/jmockit/**'
-    - '**/*.json'
-    - '**/*.dat'
-    - '**/*.svg'
-    - '**/*.md5'
-    - '**/*.patch'
-    - '**/*.log'
     - 'custom_env.sh.tpl'
     - '**/*.csv'
+    - '**/jmockit/**'
+    - 
'spark-doris-connector/spark-doris-connector-it/src/test/resources/container/'
 
   comment: on-failure
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index 7dbb72b..373910c 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -32,6 +32,7 @@ import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.OptionRequiredException;
 import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.Schema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -259,7 +260,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
             if (readColumn.contains(" AS ")) {
                 int asIdx = readColumn.indexOf(" AS ");
                 String realColumn = readColumn.substring(asIdx + 
4).trim().replaceAll("`", "");
-                if (fieldTypeMap.containsKey(realColumn) && 
scanTypeMap.containsKey(realColumn)
+                if (fieldTypeMap.containsKey(realColumn)
                         && 
("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType())
                         || 
"HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) {
                     newFieldList.add(new Field(realColumn, 
TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 4623d65..779d622 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -17,17 +17,6 @@
 
 package org.apache.doris.spark.client.read;
 
-import org.apache.arrow.adbc.core.AdbcConnection;
-import org.apache.arrow.adbc.core.AdbcDatabase;
-import org.apache.arrow.adbc.core.AdbcDriver;
-import org.apache.arrow.adbc.core.AdbcException;
-import org.apache.arrow.adbc.core.AdbcStatement;
-import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
-import org.apache.arrow.flight.Location;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowReader;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.sdk.thrift.TPrimitiveType;
 import org.apache.doris.spark.client.DorisFrontendClient;
 import org.apache.doris.spark.client.entity.DorisReaderPartition;
@@ -39,6 +28,18 @@ import 
org.apache.doris.spark.exception.OptionRequiredException;
 import org.apache.doris.spark.exception.ShouldNeverHappenException;
 import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.Schema;
+
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ import java.util.stream.Collectors;
 public class DorisFlightSqlReader extends DorisReader {
 
     private static final Logger log = 
LoggerFactory.getLogger(DorisFlightSqlReader.class);
+    private static final String PREFIX = "/* ApplicationName=Spark 
ArrowFlightSQL Query */";
     private final AtomicBoolean endOfStream = new AtomicBoolean(false);
     private final DorisFrontendClient frontendClient;
     private final Schema schema;
@@ -136,6 +138,7 @@ public class DorisFlightSqlReader extends DorisReader {
     private ArrowReader executeQuery() throws AdbcException, 
OptionRequiredException {
         AdbcStatement statement = connection.createStatement();
         String flightSql = generateQuerySql(partition);
+        log.info("Query SQL Sending to Doris FE is: {}", flightSql);
         statement.setSqlQuery(flightSql);
         AdbcStatement.QueryResult queryResult = statement.executeQuery();
         return queryResult.getReader();
@@ -147,7 +150,7 @@ public class DorisFlightSqlReader extends DorisReader {
         String tablets = String.format("TABLET(%s)", 
StringUtils.join(partition.getTablets(), ","));
         String predicates = partition.getFilters().length == 0 ? "" : " WHERE 
" + String.join(" AND ", partition.getFilters());
         String limit = partition.getLimit() > 0 ? " LIMIT " + 
partition.getLimit() : "";
-        return String.format("SELECT %s FROM %s %s%s%s", columns, 
fullTableName, tablets, predicates, limit);
+        return PREFIX + String.format("SELECT %s FROM %s %s%s%s", columns, 
fullTableName, tablets, predicates, limit);
     }
 
     protected Schema processDorisSchema(DorisReaderPartition partition) throws 
Exception {
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
index 002b58b..bc6c410 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.spark.client.read;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.doris.spark.client.DorisFrontendClient;
 import org.apache.doris.spark.client.entity.Backend;
 import org.apache.doris.spark.client.entity.DorisReaderPartition;
@@ -27,6 +26,8 @@ import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.util.DorisDialects;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class ReaderPartitionGenerator {
             originReadCols = new String[0];
         }
         String[] filters = config.contains(DorisOptions.DORIS_FILTER_QUERY) ?
-                config.getValue(DorisOptions.DORIS_FILTER_QUERY).split("\\.") 
: new String[0];
+                new String[]{config.getValue(DorisOptions.DORIS_FILTER_QUERY)} 
: new String[0];
         return generatePartitions(config, originReadCols, filters, -1, 
datetimeJava8ApiEnabled);
     }
 
@@ -131,7 +132,8 @@ public class ReaderPartitionGenerator {
 
     protected static String[] getFinalReadColumns(DorisConfig config, 
DorisFrontendClient frontendClient, String db, String table, String[] 
readFields) throws Exception {
         Schema tableSchema = frontendClient.getTableSchema(db, table);
-        Map<String, String> fieldTypeMap = 
tableSchema.getProperties().stream().collect(Collectors.toMap(Field::getName, 
Field::getType));
+        Map<String, String> fieldTypeMap = 
tableSchema.getProperties().stream().collect(
+                Collectors.toMap(Field::getName, Field::getType));
         Boolean bitmapToString = 
config.getValue(DorisOptions.DORIS_READ_BITMAP_TO_STRING);
         Boolean bitmapToBase64 = 
config.getValue(DorisOptions.DORIS_READ_BITMAP_TO_BASE64);
         return 
Arrays.stream(readFields).filter(fieldTypeMap::containsKey).map(readField -> {
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index bf576f1..d937edd 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -17,6 +17,11 @@
 
 package org.apache.doris.spark.client.read;
 
+import org.apache.doris.sdk.thrift.TScanBatchResult;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.IPUtils;
+
 import com.google.common.base.Preconditions;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BaseIntVector;
@@ -44,10 +49,6 @@ import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-import org.apache.doris.spark.exception.DorisException;
-import org.apache.doris.spark.rest.models.Schema;
-import org.apache.doris.spark.util.IPUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +65,6 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.temporal.ChronoField;
@@ -74,7 +74,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.TimeZone;
 
 /**
  * row batch data container.
@@ -266,7 +265,7 @@ public class RowBatch implements Serializable {
                                 byte[] bytes = largeIntVector.get(rowIndex);
                                 ArrayUtils.reverse(bytes);
                                 BigInteger largeInt = new BigInteger(bytes);
-                                addValueToRow(rowIndex, 
Decimal.apply(largeInt));
+                                addValueToRow(rowIndex, largeInt.toString());
                             }
                         } else {
                             VarCharVector largeIntVector = (VarCharVector) 
curFieldVector;
@@ -276,8 +275,7 @@ public class RowBatch implements Serializable {
                                     continue;
                                 }
                                 String stringValue = new 
String(largeIntVector.get(rowIndex));
-                                BigInteger largeInt = new 
BigInteger(stringValue);
-                                addValueToRow(rowIndex, 
Decimal.apply(largeInt));
+                                addValueToRow(rowIndex, stringValue);
                             }
                         }
                         break;
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java
new file mode 100644
index 0000000..848ac5f
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.exception;
+
+/** Doris runtime exception. */
+public class DorisRuntimeException extends RuntimeException {
+    public DorisRuntimeException() {
+        super();
+    }
+
+    public DorisRuntimeException(String message) {
+        super(message);
+    }
+
+    public DorisRuntimeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DorisRuntimeException(Throwable cause) {
+        super(cause);
+    }
+
+    protected DorisRuntimeException(
+            String message,
+            Throwable cause,
+            boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java
new file mode 100644
index 0000000..a13181e
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.rest.models;
+
+public enum DataModel {
+    DUPLICATE,
+    UNIQUE,
+    UNIQUE_MOR,
+    AGGREGATE
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
index e694083..91b8317 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
@@ -42,7 +42,7 @@ object SchemaConvertors {
       case "BINARY" => DataTypes.BinaryType
       case "DECIMAL" => DecimalType(precision, scale)
       case "CHAR" => DataTypes.StringType
-      case "LARGEINT" => DecimalType(38, 0)
+      case "LARGEINT" => DataTypes.StringType
       case "VARCHAR" => DataTypes.StringType
       case "JSON" => DataTypes.StringType
       case "JSONB" => DataTypes.StringType
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index 5a55a95..09f2e07 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.doris.spark.client.read;
 
+import org.apache.doris.sdk.thrift.TScanBatchResult;
+import org.apache.doris.sdk.thrift.TStatus;
+import org.apache.doris.sdk.thrift.TStatusCode;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.rest.models.Schema;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -51,11 +57,6 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-import org.apache.doris.sdk.thrift.TStatus;
-import org.apache.doris.sdk.thrift.TStatusCode;
-import org.apache.doris.spark.exception.DorisException;
-import org.apache.doris.spark.rest.models.Schema;
 import org.apache.spark.sql.types.Decimal;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import org.junit.Assert;
@@ -610,8 +611,8 @@ public class RowBatchTest {
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
 
-        Assert.assertEquals(Decimal.apply(new 
BigInteger("9223372036854775808")), actualRow0.get(0));
-        Assert.assertEquals(Decimal.apply(new 
BigInteger("9223372036854775809")), actualRow0.get(1));
+        Assert.assertEquals("9223372036854775808", actualRow0.get(0));
+        Assert.assertEquals("9223372036854775809", actualRow0.get(1));
 
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java
deleted file mode 100644
index f7c6f0b..0000000
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark;
-
-import static org.awaitility.Awaitility.given;
-import static org.awaitility.Durations.ONE_SECOND;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
-import java.util.stream.Stream;
-
-public abstract class DorisTestBase {
-    public static final String PASSWORD = "";
-    protected static final Logger LOG = 
LoggerFactory.getLogger(DorisTestBase.class);
-    protected static final String DORIS_DOCKER_IMAGE = 
System.getProperty("image");
-    protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-    protected static final String URL = "jdbc:mysql://%s:9030";
-    protected static final String USERNAME = "root";
-    protected static final GenericContainer DORIS_CONTAINER = 
createDorisContainer();
-    private static final String DRIVER_JAR =
-            
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
-    protected static Connection connection;
-
-    protected static String getFenodes() {
-        return DORIS_CONTAINER.getHost() + ":8030";
-    }
-
-    @BeforeClass
-    public static void startContainers() {
-        LOG.info("Starting containers...");
-        Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
-        given().ignoreExceptions()
-                .await()
-                .atMost(300, TimeUnit.SECONDS)
-                .pollInterval(ONE_SECOND)
-                .untilAsserted(DorisTestBase::initializeJdbcConnection);
-        LOG.info("Containers are started.");
-    }
-
-    @AfterClass
-    public static void stopContainers() {
-        LOG.info("Stopping containers...");
-        DORIS_CONTAINER.stop();
-        LOG.info("Containers are stopped.");
-    }
-
-    public static GenericContainer createDorisContainer() {
-        GenericContainer container =
-                new GenericContainer<>(DORIS_DOCKER_IMAGE)
-                        .withNetwork(Network.newNetwork())
-                        .withNetworkAliases("DorisContainer")
-                        .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
-                        .withEnv("FE_ID", "1")
-                        .withEnv("CURRENT_BE_IP", "127.0.0.1")
-                        .withEnv("CURRENT_BE_PORT", "9050")
-                        .withCommand("ulimit -n 65536")
-                        .withCreateContainerCmdModifier(
-                                cmd -> cmd.getHostConfig().withMemorySwap(0L))
-                        .withPrivilegedMode(true)
-                        .withLogConsumer(
-                                new Slf4jLogConsumer(
-                                        
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
-
-        container.setPortBindings(
-                Arrays.asList(
-                        String.format("%s:%s", "8030", "8030"),
-                        String.format("%s:%s", "9030", "9030"),
-                        String.format("%s:%s", "9060", "9060"),
-                        String.format("%s:%s", "8040", "8040")));
-
-        return container;
-    }
-
-    protected static void initializeJdbcConnection() throws SQLException, 
MalformedURLException {
-        URLClassLoader urlClassLoader =
-                new URLClassLoader(
-                        new URL[]{new URL(DRIVER_JAR)}, 
DorisTestBase.class.getClassLoader());
-        LOG.info("Try to connect to Doris...");
-        Thread.currentThread().setContextClassLoader(urlClassLoader);
-        connection =
-                DriverManager.getConnection(
-                        String.format(URL, DORIS_CONTAINER.getHost()), 
USERNAME, PASSWORD);
-        try (Statement statement = connection.createStatement()) {
-            ResultSet resultSet;
-            do {
-                LOG.info("Wait for the Backend to start successfully...");
-                resultSet = statement.executeQuery("show backends");
-            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
-        }
-        LOG.info("Connected to Doris successfully...");
-    }
-
-    private static boolean isBeReady(ResultSet rs, Duration duration) throws 
SQLException {
-        LockSupport.parkNanos(duration.toNanos());
-        if (rs.next()) {
-            String isAlive = rs.getString("Alive").trim();
-            String totalCap = rs.getString("TotalCapacity").trim();
-            return "true".equalsIgnoreCase(isAlive) && 
!"0.000".equalsIgnoreCase(totalCap);
-        }
-        return false;
-    }
-
-}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
new file mode 100644
index 0000000..97e7e26
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container;
+
+import org.apache.doris.spark.container.instance.ContainerService;
+import org.apache.doris.spark.container.instance.DorisContainer;
+import org.apache.doris.spark.container.instance.DorisCustomerContainer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public abstract class AbstractContainerTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractContainerTestBase.class);
+    protected static ContainerService dorisContainerService;
+    public static final int DEFAULT_PARALLELISM = 2;
+
+    @BeforeClass
+    public static void initContainers() {
+        LOG.info("Trying to start doris containers.");
+        initDorisContainer();
+    }
+
+    private static void initDorisContainer() {
+        if (Objects.nonNull(dorisContainerService) && 
dorisContainerService.isRunning()) {
+            LOG.info("The doris container has been started and is running 
status.");
+            return;
+        }
+        Boolean customerEnv = 
Boolean.valueOf(System.getProperty("customer_env", "false"));
+        dorisContainerService = customerEnv ? new DorisCustomerContainer() : 
new DorisContainer();
+        dorisContainerService.startContainer();
+        LOG.info("Doris container was started.");
+    }
+
+    protected static Connection getDorisQueryConnection() {
+        return dorisContainerService.getQueryConnection();
+    }
+
+    protected static Connection getDorisQueryConnection(String database) {
+        return dorisContainerService.getQueryConnection(database);
+    }
+
+    protected String getFenodes() {
+        return dorisContainerService.getFenodes();
+    }
+
+    protected String getBenodes() {
+        return dorisContainerService.getBenodes();
+    }
+
+    protected String getDorisUsername() {
+        return dorisContainerService.getUsername();
+    }
+
+    protected String getDorisPassword() {
+        return dorisContainerService.getPassword();
+    }
+
+    protected String getDorisQueryUrl() {
+        return dorisContainerService.getJdbcUrl();
+    }
+
+    protected String getDorisInstanceHost() {
+        return dorisContainerService.getInstanceHost();
+    }
+
+    public static void closeContainers() {
+        LOG.info("Starting to close containers.");
+        closeDorisContainer();
+    }
+
+    private static void closeDorisContainer() {
+        if (Objects.isNull(dorisContainerService)) {
+            return;
+        }
+        dorisContainerService.close();
+        LOG.info("Doris container was closed.");
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+    public static void assertEqualsInAnyOrder(List<Object> expected, 
List<Object> actual) {
+        assertTrue(expected != null && actual != null);
+        assertEqualsInOrder(
+                expected.stream().sorted().collect(Collectors.toList()),
+                actual.stream().sorted().collect(Collectors.toList()));
+    }
+
+    public static void assertEqualsInOrder(List<Object> expected, List<Object> 
actual) {
+        assertTrue(expected != null && actual != null);
+        assertEquals(expected.size(), actual.size());
+        assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new 
Object[0]));
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java
new file mode 100644
index 0000000..1b54c69
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.spark.exception.DorisRuntimeException;
+import org.junit.Assert;
+import org.slf4j.Logger;
+
+public class ContainerUtils {
+
+    public static void executeSQLStatement(Connection connection, Logger 
logger, String... sql) {
+        if (Objects.isNull(sql) || sql.length == 0) {
+            return;
+        }
+        try (Statement statement = connection.createStatement()) {
+            for (String s : sql) {
+                if (StringUtils.isNotEmpty(s)) {
+                    logger.info("start to execute sql={}", s);
+                    statement.execute(s);
+                }
+            }
+        } catch (SQLException e) {
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    public static List<String> executeSQLStatement(
+            Connection connection, Logger logger, String sql, int columnSize) {
+        List<String> result = new ArrayList<>();
+        if (Objects.isNull(sql)) {
+            return result;
+        }
+        try (Statement statement = connection.createStatement()) {
+            logger.info("start to execute sql={}", sql);
+            ResultSet resultSet = statement.executeQuery(sql);
+
+            while (resultSet.next()) {
+                StringJoiner sb = new StringJoiner(",");
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = resultSet.getObject(i);
+                    sb.add(String.valueOf(value));
+                }
+                result.add(sb.toString());
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    public static String loadFileContent(String resourcePath) {
+        try (InputStream stream =
+                
ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
+            return new BufferedReader(new 
InputStreamReader(Objects.requireNonNull(stream)))
+                    .lines()
+                    .collect(Collectors.joining("\n"));
+        } catch (IOException e) {
+            throw new DorisRuntimeException("Failed to read " + resourcePath + 
" file.", e);
+        }
+    }
+
+    public static List<String> parseFileArgs(String resourcePath) {
+        String fileContent = ContainerUtils.loadFileContent(resourcePath);
+        String[] args = fileContent.split("\n");
+        List<String> argList = new ArrayList<>();
+        for (String arg : args) {
+            String[] split = arg.trim().split("\\s+");
+            List<String> stringList =
+                    Arrays.stream(split)
+                            .map(ContainerUtils::removeQuotes)
+                            .collect(Collectors.toList());
+            argList.addAll(stringList);
+        }
+        return argList;
+    }
+
+    private static String removeQuotes(String str) {
+        if (str == null || str.length() < 2) {
+            return str;
+        }
+        if (str.startsWith("\"") && str.endsWith("\"")) {
+            return str.substring(1, str.length() - 1);
+        }
+        if (str.startsWith("\\'") && str.endsWith("\\'")) {
+            return str.substring(1, str.length() - 1);
+        }
+        return str;
+    }
+
+    public static String[] parseFileContentSQL(String resourcePath) {
+        String fileContent = loadFileContent(resourcePath);
+        return 
Arrays.stream(fileContent.split(";")).map(String::trim).toArray(String[]::new);
+    }
+
+    public static void checkResult(
+            Connection connection,
+            Logger logger,
+            List<String> expected,
+            String query,
+            int columnSize) {
+        checkResult(connection, logger, expected, query, columnSize, true);
+    }
+
+    public static void checkResult(
+            Connection connection,
+            Logger logger,
+            List<String> expected,
+            String query,
+            int columnSize,
+            boolean ordered) {
+        List<String> actual = getResult(connection, logger, expected, query, 
columnSize);
+        if (ordered) {
+            Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+        } else {
+            Assert.assertEquals(expected.size(), actual.size());
+            Assert.assertArrayEquals(
+                    expected.stream().sorted().toArray(Object[]::new),
+                    actual.stream().sorted().toArray(Object[]::new));
+        }
+    }
+
+    public static List<String> getResult(
+            Connection connection,
+            Logger logger,
+            List<String> expected,
+            String query,
+            int columnSize) {
+        List<String> actual = new ArrayList<>();
+        try (Statement statement = connection.createStatement()) {
+            ResultSet sinkResultSet = statement.executeQuery(query);
+            while (sinkResultSet.next()) {
+                List<String> row = new ArrayList<>();
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = sinkResultSet.getObject(i);
+                    if (value == null) {
+                        row.add("null");
+                    } else {
+                        row.add(value.toString());
+                    }
+                }
+                actual.add(StringUtils.join(row, ","));
+            }
+        } catch (SQLException e) {
+            logger.info(
+                    "Failed to check query result. expected={}, actual={}",
+                    String.join(",", expected),
+                    String.join(",", actual),
+                    e);
+            throw new DorisRuntimeException(e);
+        }
+        logger.info(
+                "checking test result. expected={}, actual={}",
+                String.join(",", expected),
+                String.join(",", actual));
+        return actual;
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
new file mode 100644
index 0000000..3ec7ee5
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container.instance;
+
+import org.apache.doris.spark.exception.DorisRuntimeException;
+
+import java.sql.Connection;
+
+public interface ContainerService {
+    void startContainer();
+
+    default void restartContainer() {
+        throw new DorisRuntimeException("Only doris docker container can 
implemented.");
+    };
+
+    boolean isRunning();
+
+    Connection getQueryConnection();
+
+    Connection getQueryConnection(String database);
+
+    String getJdbcUrl();
+
+    String getInstanceHost();
+
+    Integer getMappedPort(int originalPort);
+
+    String getUsername();
+
+    String getPassword();
+
+    String getFenodes();
+
+    String getBenodes();
+
+    void close();
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
new file mode 100644
index 0000000..7c9297e
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container.instance;
+
+import org.apache.doris.spark.exception.DorisRuntimeException;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+public class DorisContainer implements ContainerService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisContainer.class);
+    private static final String DEFAULT_DOCKER_IMAGE = 
"apache/doris:doris-all-in-one-2.1.0";
+    private static final String DORIS_DOCKER_IMAGE =
+            System.getProperty("image") == null
+                    ? DEFAULT_DOCKER_IMAGE
+                    : System.getProperty("image");
+    private static final String DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
+    private static final String JDBC_URL = "jdbc:mysql://%s:9030";
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private final GenericContainer dorisContainer;
+
+    public DorisContainer() {
+        dorisContainer = createDorisContainer();
+    }
+
+    public GenericContainer createDorisContainer() {
+        LOG.info("Will create doris containers.");
+        GenericContainer container =
+                new GenericContainer<>(DORIS_DOCKER_IMAGE)
+                        .withNetwork(Network.newNetwork())
+                        .withNetworkAliases("DorisContainer")
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+                        // use customer conf
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("docker/doris/be.conf"),
+                                "/opt/apache-doris/be/conf/be.conf")
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("docker/doris/fe.conf"),
+                                "/opt/apache-doris/fe/conf/fe.conf")
+                        .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610);
+
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", "8030", "8030"),
+                        String.format("%s:%s", "9030", "9030"),
+                        String.format("%s:%s", "9060", "9060"),
+                        String.format("%s:%s", "8040", "8040"),
+                        String.format("%s:%s", "9611", "9611"),
+                        String.format("%s:%s", "9610", "9610")));
+        return container;
+    }
+
+    public void startContainer() {
+        try {
+            LOG.info("Starting doris containers.");
+            // singleton doris container
+            dorisContainer.start();
+            initializeJdbcConnection();
+            initializeVariables();
+            printClusterStatus();
+        } catch (Exception ex) {
+            LOG.error("Failed to start containers doris", ex);
+            throw new DorisRuntimeException("Failed to start containers 
doris", ex);
+        }
+        LOG.info("Doris container started successfully.");
+    }
+
+    @Override
+    public void restartContainer() {
+        LOG.info("Restart doris container.");
+        dorisContainer
+                .getDockerClient()
+                .restartContainerCmd(dorisContainer.getContainerId())
+                .exec();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return dorisContainer.isRunning();
+    }
+
+    @Override
+    public Connection getQueryConnection() {
+       return getQueryConnection("");
+    }
+
+    @Override
+    public Connection getQueryConnection(String database) {
+        LOG.info("Try to get query connection from doris.");
+        String jdbcUrl = String.format(JDBC_URL, dorisContainer.getHost());
+        jdbcUrl = jdbcUrl + "/" + database;
+        try {
+            return DriverManager.getConnection(jdbcUrl, USERNAME, PASSWORD);
+        } catch (SQLException e) {
+            LOG.info("Failed to get doris query connection. jdbcUrl={}", 
jdbcUrl, e);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    private void initializeVariables() throws Exception {
+        try (Connection connection = getQueryConnection();
+                Statement statement = connection.createStatement()) {
+            LOG.info("init doris cluster variables.");
+            // avoid arrow flight sql reading bug
+            statement.execute("SET PROPERTY FOR 'root' 'max_user_connections' 
= '1024';");
+        }
+        LOG.info("Init variables successfully.");
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return String.format(JDBC_URL, dorisContainer.getHost());
+    }
+
+    @Override
+    public String getInstanceHost() {
+        return dorisContainer.getHost();
+    }
+
+    @Override
+    public Integer getMappedPort(int originalPort) {
+        return dorisContainer.getMappedPort(originalPort);
+    }
+
+    @Override
+    public String getUsername() {
+        return USERNAME;
+    }
+
+    @Override
+    public String getPassword() {
+        return PASSWORD;
+    }
+
+    @Override
+    public String getFenodes() {
+        return dorisContainer.getHost() + ":8030";
+    }
+
+    @Override
+    public String getBenodes() {
+        return dorisContainer.getHost() + ":8040";
+    }
+
+    public void close() {
+        LOG.info("Doris container is about to be close.");
+        dorisContainer.close();
+        LOG.info("Doris container closed successfully.");
+    }
+
+    private void initializeJDBCDriver() throws MalformedURLException {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(DRIVER_JAR)}, 
DorisContainer.class.getClassLoader());
+        LOG.info("Try to connect to Doris.");
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+    }
+
+    private void initializeJdbcConnection() throws Exception {
+        initializeJDBCDriver();
+        try (Connection connection = getQueryConnection();
+                Statement statement = connection.createStatement()) {
+            ResultSet resultSet;
+            do {
+                LOG.info("Waiting for the Backend to start successfully.");
+                resultSet = statement.executeQuery("show backends");
+            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+        }
+        LOG.info("Connected to Doris successfully.");
+    }
+
+    private boolean isBeReady(ResultSet rs, Duration duration) throws 
SQLException {
+        LockSupport.parkNanos(duration.toNanos());
+        if (rs.next()) {
+            String isAlive = rs.getString("Alive").trim();
+            String totalCap = rs.getString("TotalCapacity").trim();
+            return Boolean.toString(true).equalsIgnoreCase(isAlive)
+                    && !"0.000".equalsIgnoreCase(totalCap);
+        }
+        return false;
+    }
+
+    private void printClusterStatus() throws Exception {
+        LOG.info("Current machine IP: {}", dorisContainer.getHost());
+        echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+        echo("sh", "-c", "free -h");
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(JDBC_URL, 
dorisContainer.getHost()),
+                                USERNAME,
+                                PASSWORD);
+                Statement statement = connection.createStatement()) {
+            ResultSet showFrontends = statement.executeQuery("show frontends");
+            LOG.info("Frontends status: {}", convertList(showFrontends));
+            ResultSet showBackends = statement.executeQuery("show backends");
+            LOG.info("Backends status: {}", convertList(showBackends));
+        }
+    }
+
+    private void echo(String... cmd) {
+        try {
+            Process p = Runtime.getRuntime().exec(cmd);
+            InputStream is = p.getInputStream();
+            BufferedReader reader = new BufferedReader(new 
InputStreamReader(is));
+            String line;
+            while ((line = reader.readLine()) != null) {
+                System.out.println(line);
+            }
+            p.waitFor();
+            is.close();
+            reader.close();
+            p.destroy();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private List<Map> convertList(ResultSet rs) throws SQLException {
+        List<Map> list = new ArrayList<>();
+        ResultSetMetaData metaData = rs.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        while (rs.next()) {
+            Map<String, Object> rowData = new HashMap<>();
+            for (int i = 1; i <= columnCount; i++) {
+                rowData.put(metaData.getColumnName(i), rs.getObject(i));
+            }
+            list.add(rowData);
+        }
+        return list;
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
new file mode 100644
index 0000000..4ba4e74
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container.instance;
+
+import org.apache.doris.spark.exception.DorisRuntimeException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** Using a custom Doris environment */
+public class DorisCustomerContainer implements ContainerService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisCustomerContainer.class);
+    private static final String JDBC_URL = "jdbc:mysql://%s:%s";
+
+    @Override
+    public void startContainer() {
+        LOG.info("Using doris customer containers env.");
+        checkParams();
+        if (!isRunning()) {
+            throw new DorisRuntimeException(
+                    "Backend is not alive. Please check the doris cluster.");
+        }
+    }
+
+    private void checkParams() {
+        Preconditions.checkArgument(
+                System.getProperty("doris_host") != null, "doris_host is 
required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_query_port") != null, 
"doris_query_port is required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_http_port") != null, 
"doris_http_port is required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_user") != null, "doris_user is 
required.");
+        Preconditions.checkArgument(
+                System.getProperty("doris_passwd") != null, "doris_passwd is 
required.");
+    }
+
+    @Override
+    public boolean isRunning() {
+        try (Connection conn = getQueryConnection();
+                Statement stmt = conn.createStatement()) {
+            ResultSet showBackends = stmt.executeQuery("show backends");
+            while (showBackends.next()) {
+                String isAlive = showBackends.getString("Alive").trim();
+                if (Boolean.toString(true).equalsIgnoreCase(isAlive)) {
+                    return true;
+                }
+            }
+        } catch (SQLException e) {
+            LOG.error("Failed to connect doris cluster.", e);
+            return false;
+        }
+        return false;
+    }
+
+    @Override
+    public Connection getQueryConnection() {
+        return getQueryConnection("");
+    }
+
+    @Override
+    public Connection getQueryConnection(String database) {
+        LOG.info("Try to get query connection from doris.");
+        String jdbcUrl =
+                String.format(
+                        JDBC_URL,
+                        System.getProperty("doris_host"),
+                        System.getProperty("doris_query_port"));
+        jdbcUrl = jdbcUrl + "/" + database;
+        try {
+            return DriverManager.getConnection(jdbcUrl, getUsername(), 
getPassword());
+        } catch (SQLException e) {
+            LOG.info("Failed to get doris query connection. jdbcUrl={}", 
jdbcUrl, e);
+            throw new DorisRuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return String.format(
+                JDBC_URL, System.getProperty("doris_host"), 
System.getProperty("doris_query_port"));
+    }
+
+    @Override
+    public String getInstanceHost() {
+        return System.getProperty("doris_host");
+    }
+
+    @Override
+    public Integer getMappedPort(int originalPort) {
+        return originalPort;
+    }
+
+    @Override
+    public String getUsername() {
+        return System.getProperty("doris_user");
+    }
+
+    @Override
+    public String getPassword() {
+        return System.getProperty("doris_passwd");
+    }
+
+    @Override
+    public String getFenodes() {
+        return System.getProperty("doris_host") + ":" + 
System.getProperty("doris_http_port");
+    }
+
+    @Override
+    public String getBenodes() {
+        return null;
+    }
+
+    @Override
+    public void close() {}
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 2d7930a..67a0688 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -17,34 +17,66 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.{DorisTestBase, sparkContextFunctions}
-import org.apache.spark.sql.SparkSession
+import 
org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.doris.spark.sparkContextFunctions
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.{SparkConf, SparkContext}
-import org.junit.Test
+import org.junit.Assert.fail
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.slf4j.LoggerFactory
 
-import java.sql.Statement
+import java.sql.{Date, Timestamp}
 
-class DorisReaderITCase extends DorisTestBase {
+object DorisReaderITCase {
+  @Parameterized.Parameters(name = "readMode: {0}, flightSqlPort: {1}")
+  def parameters(): java.util.Collection[Array[AnyRef]] = {
+    import java.util.Arrays
+    Arrays.asList(
+      Array("thrift": java.lang.String, -1: java.lang.Integer),
+      Array("arrow": java.lang.String, 9611: java.lang.Integer)
+    )
+  }
+}
+
+@RunWith(classOf[Parameterized])
+class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends 
AbstractContainerTestBase {
 
-  val DATABASE: String = "test"
-  val TABLE_READ: String = "tbl_read"
-  val TABLE_READ_TBL: String = "tbl_read_tbl"
+  private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase])
+
+  val DATABASE = "test_doris_read"
+  val TABLE_READ = "tbl_read"
+  val TABLE_READ_TBL = "tbl_read_tbl"
+  val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types"
+  val TABLE_READ_TBL_BIT_MAP = "tbl_read_tbl_bitmap"
+
+  @Before
+  def setUp(): Unit = {
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+  }
 
   @Test
   @throws[Exception]
   def testRddSource(): Unit = {
-    initializeTable(TABLE_READ)
 
-    val sparkConf: SparkConf = new 
SparkConf().setMaster("local[*]").setAppName("rddSource")
+    initializeTable(TABLE_READ, DataModel.DUPLICATE)
+    val sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("rddSource")
     val sc = new SparkContext(sparkConf)
     // sc.setLogLevel("DEBUG")
     val dorisSparkRDD = sc.dorisRDD(
       tableIdentifier = Some(DATABASE + "." + TABLE_READ),
       cfg = Some(Map(
-        "doris.fenodes" -> DorisTestBase.getFenodes,
-        "doris.request.auth.user" -> DorisTestBase.USERNAME,
-        "doris.request.auth.password" -> DorisTestBase.PASSWORD,
-        "doris.fe.init.fetch" -> "false"
+        "doris.fenodes" -> getFenodes,
+        "doris.request.auth.user" -> getDorisUsername,
+        "doris.request.auth.password" -> getDorisPassword,
+        "doris.fe.init.fetch" -> "false",
+        "doris.read.mode" -> readMode,
+        "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString
       ))
     )
     val result = dorisSparkRDD.collect()
@@ -56,15 +88,17 @@ class DorisReaderITCase extends DorisTestBase {
   @Test
   @throws[Exception]
   def testDataFrameSource(): Unit = {
-    initializeTable(TABLE_READ_TBL)
+    initializeTable(TABLE_READ_TBL, DataModel.UNIQUE)
 
     val session = SparkSession.builder().master("local[*]").getOrCreate()
     val dorisSparkDF = session.read
       .format("doris")
-      .option("doris.fenodes", DorisTestBase.getFenodes)
+      .option("doris.fenodes", getFenodes)
       .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
-      .option("doris.user", DorisTestBase.USERNAME)
-      .option("doris.password", DorisTestBase.PASSWORD)
+      .option("doris.user", getDorisUsername)
+      .option("doris.password", getDorisPassword)
+      .option("doris.read.mode", readMode)
+      .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString)
       .load()
 
     val result = dorisSparkDF.collect().toList.toString()
@@ -75,7 +109,7 @@ class DorisReaderITCase extends DorisTestBase {
   @Test
   @throws[Exception]
   def testSQLSource(): Unit = {
-    initializeTable(TABLE_READ_TBL)
+    initializeTable(TABLE_READ_TBL, DataModel.UNIQUE_MOR)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
     session.sql(
       s"""
@@ -83,9 +117,11 @@ class DorisReaderITCase extends DorisTestBase {
          |USING doris
          |OPTIONS(
          | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
-         | "fenodes"="${DorisTestBase.getFenodes}",
-         | "user"="${DorisTestBase.USERNAME}",
-         | "password"="${DorisTestBase.PASSWORD}"
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
          |)
          |""".stripMargin)
 
@@ -98,53 +134,44 @@ class DorisReaderITCase extends DorisTestBase {
     assert("List([doris,18], [spark,10])".equals(result))
   }
 
-  @throws[Exception]
-  private def initializeTable(table: String): Unit = {
-    try {
-      val statement: Statement = DorisTestBase.connection.createStatement
-      try {
-        statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", 
DATABASE))
-        statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table))
-        statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
-          "`name` varchar(256),\n" +
-          "`age` int\n" +
-          ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
-          "PROPERTIES (\n" +
-          "\"replication_num\" = \"1\"\n" +
-          ")\n", DATABASE, table))
-        statement.execute(String.format("insert into %s.%s  values 
('doris',18)", DATABASE, table))
-        statement.execute(String.format("insert into %s.%s  values 
('spark',10)", DATABASE, table))
-      } finally {
-        if (statement != null) statement.close()
-      }
-    }
+  private def initializeTable(table: String, dataModel: DataModel): Unit = {
+    val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+    val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
+    val model = if (dataModel == DataModel.UNIQUE_MOR) 
DataModel.UNIQUE.toString else dataModel.toString
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+      String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+      String.format("CREATE TABLE %s.%s ( \n"
+        + "`name` varchar(256),\n"
+        + "`age` int %s\n"
+        + ") "
+        + " %s KEY(`name`) "
+        + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+        + "PROPERTIES ("
+        + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, 
max, model),
+      String.format("insert into %s.%s  values ('doris',18)", DATABASE, table),
+      String.format("insert into %s.%s  values ('spark',10)", DATABASE, table))
   }
 
-  private def compareCollectResult(a1: Array[AnyRef], a2: Array[AnyRef]): 
Boolean = {
-    if (a1.length == a2.length) {
-      for (idx <- 0 until a1.length) {
-        if (!a1(idx).isInstanceOf[Array[AnyRef]] || 
!a2(idx).isInstanceOf[Array[AnyRef]]) {
-          return false
-        }
-        val arr1 = a1(idx).asInstanceOf[Array[AnyRef]]
-        val arr2 = a2(idx).asInstanceOf[Array[AnyRef]]
-        if (arr1.length != arr2.length) {
-          return false
-        }
-        for (idx2 <- 0 until arr2.length) {
-          if (arr1(idx2) != arr2(idx2)) {
-            return false
-          }
-        }
+  private def compareCollectResult(a1: Array[AnyRef], a2: Array[AnyRef]): 
Boolean = if (a1.length == a2.length) {
+    for (idx <- 0 until a1.length) {
+      if (!a1(idx).isInstanceOf[Array[AnyRef]] || 
!a2(idx).isInstanceOf[Array[AnyRef]]) return false
+      val arr1 = a1(idx).asInstanceOf[Array[AnyRef]]
+      val arr2 = a2(idx).asInstanceOf[Array[AnyRef]]
+      if (arr1.length != arr2.length) return false
+      for (idx2 <- 0 until arr2.length) {
+        if (arr1(idx2) != arr2(idx2)) return false
       }
-      true
-    } else false
-  }
+    }
+    true
+  } else false
 
   @Test
   @throws[Exception]
   def testSQLSourceWithCondition(): Unit = {
-    initializeTable(TABLE_READ_TBL)
+    initializeTable(TABLE_READ_TBL, DataModel.AGGREGATE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
     session.sql(
       s"""
@@ -152,9 +179,11 @@ class DorisReaderITCase extends DorisTestBase {
          |USING doris
          |OPTIONS(
          | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
-         | "fenodes"="${DorisTestBase.getFenodes}",
-         | "user"="${DorisTestBase.USERNAME}",
-         | "password"="${DorisTestBase.PASSWORD}"
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
          |)
          |""".stripMargin)
 
@@ -167,5 +196,232 @@ class DorisReaderITCase extends DorisTestBase {
     assert("List([doris,18])".equals(result))
   }
 
+  @Test
+  def testReadAllType(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+         |)
+         |""".stripMargin)
+    session.sql("desc test_source").show(true);
+    val actualData = session.sql(
+      """
+        |select * from test_source order by id
+        |""".stripMargin).collect()
+    session.stop()
+
+    val expectedData = Array(
+      Row(1, true, 127, 32767, 2147483647, 9223372036854775807L, 
"170141183460469231731687303715884105727",
+        3.14f, 2.71828, new java.math.BigDecimal("12345.6789"), 
Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A", 
"Hello, Doris!", "This is a string",
+        """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"), 
"""{"name":"Tom","age":30}""",
+        """{"key":"value"}""", """{"type":"variant","data":123}"""),
+      Row(2, false, -128, -32768, -2147483648, -9223372036854775808L, 
"-170141183460469231731687303715884105728",
+        -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"), 
Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B", 
"Doris Test", "Another string!",
+        """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"), 
"""{"name":"Jerry","age":25}""",
+        """{"status":"ok"}""", """{"data":[1,2,3]}"""),
+      Row(3, true, 0, 0, 0, 0, "0",
+        0.0f, 0.0, new java.math.BigDecimal("0.0000"), 
Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C", 
"Test Doris", "Sample text",
+        """["Eve","Frank"]""", Map("alpha" -> "beta"), 
"""{"name":"Alice","age":40}""",
+        """{"nested":{"key":"value"}}""", """{"variant":"test"}"""),
+      Row(4, null, null, null, null, null, null,
+        null, null, null, null, null, null, null, null,
+        null, null, null, null, null)
+    )
+
+    val differences = actualData.zip(expectedData).zipWithIndex.flatMap {
+      case ((actualRow, expectedRow), rowIndex) =>
+        actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect {
+          case ((actualValue, expectedValue), colIndex)
+            if actualValue != expectedValue =>
+            s"Row $rowIndex, Column $colIndex: actual=$actualValue, 
expected=$expectedValue"
+        }
+    }
+
+    if (differences.nonEmpty) {
+      fail(s"Data mismatch found:\n${differences.mkString("\n")}")
+    }
+  }
+
+  @Test
+  def testBitmapRead(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+         |)
+         |""".stripMargin)
+    session.sql("desc test_source").show(true);
+    val actualData = session.sql(
+      """
+        |select * from test_source order by hour
+        |""".stripMargin).collect()
+    session.stop()
+
+    assert("List([20200622,1,Read unsupported], [20200622,2,Read unsupported], 
[20200622,3,Read unsupported])".equals(actualData.toList.toString()))
+  }
+
+  @Test
+  def testBitmapRead2String(): Unit = {
+    if(readMode.equals("thrift")){
+      return
+    }
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+         | "doris.read.bitmap-to-string"="true"
+         |)
+         |""".stripMargin)
+    session.sql("desc test_source").show(true);
+    val actualData = session.sql(
+      """
+        |select * from test_source order by hour
+        |""".stripMargin).collect()
+    session.stop()
+
+    assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543], 
[20200622,3,287667876573])"
+      .equals(actualData.toList.toString()))
+  }
+
+  @Test
+  def testBitmapRead2Base64(): Unit = {
+    if(readMode.equals("thrift")){
+      return
+    }
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+         | "doris.read.bitmap-to-base64"="true"
+         |)
+         |""".stripMargin)
+    session.sql("desc test_source").show(true);
+    val actualData = session.sql(
+      """
+        |select * from test_source order by hour
+        |""".stripMargin).collect()
+    session.stop()
+
+    assert("List([20200622,1,AfMAAAA=], 
[20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])"
+      .equals(actualData.toList.toString()))
+  }
 
+  @Test
+  def testReadPushDownProject(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    session.sql(
+      s"""
+         |CREATE TEMPORARY VIEW test_source
+         |USING doris
+         |OPTIONS(
+         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}",
+         | "doris.read.mode"="${readMode}",
+         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+         |)
+         |""".stripMargin)
+
+    val intFilter = session.sql(
+      """
+        |select id,c1,c2 from test_source where id = 2 and c1 = false and c4 
!= 3
+        |""".stripMargin).collect()
+
+    assert("List([2,false,-128])".equals(intFilter.toList.toString()))
+
+    val floatFilter = session.sql(
+      """
+        |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15
+        |""".stripMargin).collect()
+
+    
assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString()))
+
+    val dateFilter = session.sql(
+      """
+        |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13 
like 'Hello%'
+        |""".stripMargin).collect()
+
+    assert("List([1,2025-03-11,2025-03-11 
12:34:56.0])".equals(dateFilter.toList.toString()))
+
+    val datetimeFilter = session.sql(
+      """
+        |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 = 
'2024-12-25 23:59:59'
+        |""".stripMargin).collect()
+
+    assert("List([2,2024-12-25 
23:59:59.0,B])".equals(datetimeFilter.toList.toString()))
+
+    val stringFilter = session.sql(
+      """
+        |select id,c13,c14 from test_source where c11 >= '2024-12-25 23:59:59' 
and c13 = 'Hello, Doris!'
+        |""".stripMargin).collect()
+
+    assert("List([1,Hello, Doris!,This is a 
string])".equals(stringFilter.toList.toString()))
+
+    val nullFilter = session.sql(
+      """
+        |select id,c13,c14 from test_source where c14 is null
+        |""".stripMargin).collect()
+
+    assert("List([4,null,null])".equals(nullFilter.toList.toString()))
+
+    val notNullFilter = session.sql(
+      """
+        |select id from test_source where c15 is not null and c12 in ('A', 'B')
+        |""".stripMargin).collect()
+
+    assert("List([1], [2])".equals(notNullFilter.toList.toString()))
+
+    val likeFilter = session.sql(
+      """
+        |select id from test_source where c19 like '%variant%' and c13 like 
'Test%'
+        |""".stripMargin).collect()
+
+    assert("List([3])".equals(likeFilter.toList.toString()))
+    session.stop()
+  }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 6e1bd08..7f1e393 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -17,15 +17,17 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.doris.spark.DorisTestBase
-import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, MapType, 
StructField, StructType}
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import 
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
 getDorisQueryConnection}
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.junit.Test
+import org.slf4j.LoggerFactory
 
-import java.sql.{Date, ResultSet, Statement, Timestamp}
-import scala.collection.mutable.ListBuffer
+import java.util
+import scala.collection.JavaConverters._
+class DorisWriterITCase extends AbstractContainerTestBase {
 
-class DorisWriterITCase extends DorisTestBase {
+  private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase])
 
   val DATABASE: String = "test"
   val TABLE_CSV: String = "tbl_csv"
@@ -43,10 +45,10 @@ class DorisWriterITCase extends DorisTestBase {
     )).toDF("name", "age")
     df.write
       .format("doris")
-      .option("doris.fenodes", DorisTestBase.getFenodes)
+      .option("doris.fenodes", getFenodes)
       .option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
-      .option("user", DorisTestBase.USERNAME)
-      .option("password", DorisTestBase.PASSWORD)
+      .option("user", getDorisUsername)
+      .option("password", getDorisPassword)
       .option("sink.properties.column_separator", ",")
       .option("sink.properties.line_delimiter", "\n")
       .option("sink.properties.format", "csv")
@@ -55,9 +57,13 @@ class DorisWriterITCase extends DorisTestBase {
     session.stop()
 
     Thread.sleep(10000)
-    val actual = queryResult(TABLE_CSV);
-    val expected = ListBuffer(List("doris_csv", 1), List("spark_csv", 2))
-    assert(expected.equals(actual))
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_CSV),
+      2)
+    val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+    checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), 
actual.toArray)
   }
 
   @Test
@@ -71,10 +77,10 @@ class DorisWriterITCase extends DorisTestBase {
     )).toDF("name", "age")
     df.write
       .format("doris")
-      .option("doris.fenodes", DorisTestBase.getFenodes)
+      .option("doris.fenodes", getFenodes)
       .option("doris.table.identifier", DATABASE + "." + TABLE_JSON)
-      .option("user", DorisTestBase.USERNAME)
-      .option("password", DorisTestBase.PASSWORD)
+      .option("user", getDorisUsername)
+      .option("password", getDorisPassword)
       .option("sink.properties.read_json_by_line", "true")
       .option("sink.properties.format", "json")
       .option("doris.sink.auto-redirect", "false")
@@ -83,9 +89,13 @@ class DorisWriterITCase extends DorisTestBase {
     session.stop()
 
     Thread.sleep(10000)
-    val actual = queryResult(TABLE_JSON);
-    val expected = ListBuffer(List("doris_json", 1), List("spark_json", 2))
-    assert(expected.equals(actual))
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_JSON),
+      2)
+    val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
+    checkResultInAnyOrder("testSinkJsonFormat", expected.toArray, 
actual.toArray)
   }
 
   @Test
@@ -104,9 +114,9 @@ class DorisWriterITCase extends DorisTestBase {
          |USING doris
          |OPTIONS(
          | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}",
-         | "fenodes"="${DorisTestBase.getFenodes}",
-         | "user"="${DorisTestBase.USERNAME}",
-         | "password"="${DorisTestBase.PASSWORD}"
+         | "fenodes"="${getFenodes}",
+         | "user"="${getDorisUsername}",
+         | "password"="${getDorisPassword}"
          |)
          |""".stripMargin)
     session.sql(
@@ -116,40 +126,34 @@ class DorisWriterITCase extends DorisTestBase {
     session.stop()
 
     Thread.sleep(10000)
-    val actual = queryResult(TABLE_JSON_TBL);
-    val expected = ListBuffer(List("doris_tbl", 1), List("spark_tbl", 2))
-    assert(expected.equals(actual))
+    val actual = ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL),
+      2)
+    val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
+    checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, 
actual.toArray)
   }
 
-  private def queryResult(table: String): ListBuffer[Any] = {
-    val actual = new ListBuffer[Any]
-    try {
-      val sinkStatement: Statement = DorisTestBase.connection.createStatement
-      try {
-        val sinkResultSet: ResultSet = 
sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 
1", DATABASE, table))
-        while (sinkResultSet.next) {
-          val row = List(sinkResultSet.getString("name"), 
sinkResultSet.getInt("age"))
-          actual += row
-        }
-      } finally if (sinkStatement != null) sinkStatement.close()
-    }
-    actual
-  }
 
   @throws[Exception]
   private def initializeTable(table: String): Unit = {
-    try {
-      val statement: Statement = DorisTestBase.connection.createStatement
-      try {
-        statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", 
DATABASE))
-        statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, table))
-        statement.execute(String.format(
-          "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" 
+ ") " +
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+      String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+      String.format(
+        "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" + 
") " +
           "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
           "PROPERTIES (\n" +
-          "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table))
-      } finally if (statement != null) statement.close()
-    }
+          "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table)
+    )
+  }
+
+  private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], 
actual: Array[AnyRef]): Unit = {
+    LOG.info("Checking DorisSourceITCase result. testName={}, actual={}, 
expected={}", testName, actual, expected)
+    assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
   }
 
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql
new file mode 100644
index 0000000..7632e02
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql
@@ -0,0 +1,50 @@
+DROP TABLE IF EXISTS tbl_read_tbl_all_types;
+
+CREATE TABLE tbl_read_tbl_all_types (
+`id` int,
+`c1` boolean,
+`c2` tinyint,
+`c3` smallint,
+`c4` int,
+`c5` bigint,
+`c6` largeint,
+`c7` float,
+`c8` double,
+`c9` decimal(12,4),
+`c10` date,
+`c11` datetime,
+`c12` char(1),
+`c13` varchar(16),
+`c14` string,
+`c15` Array<String>,
+`c16` Map<String, String>,
+`c17` Struct<name: String, age: int>,
+`c18` JSON,
+`c19` JSON -- doris2.1.0 can not read VARIANT
+)
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 2
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+    (1, true, 127, 32767, 2147483647, 9223372036854775807, 
170141183460469231731687303715884105727,
+     3.14, 2.71828, 12345.6789, '2025-03-11', '2025-03-11 12:34:56', 'A', 
'Hello, Doris!', 'This is a string',
+        ['Alice', 'Bob'], {'key1': 'value1', 'key2': 'value2'}, STRUCT('Tom', 
30), '{"key": "value"}', '{"type": "variant", "data": 123}');
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+    (2, false, -128, -32768, -2147483648, -9223372036854775808, 
-170141183460469231731687303715884105728,
+     -1.23, 0.0001, -9999.9999, '2024-12-25', '2024-12-25 23:59:59', 'B', 
'Doris Test', 'Another string!',
+        ['Charlie', 'David'], {'k1': 'v1', 'k2': 'v2'}, STRUCT('Jerry', 25), 
'{"status": "ok"}', '{"data": [1, 2, 3]}' );
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+    (3, true, 0, 0, 0, 0, 0,
+     0.0, 0.0, 0.0000, '2023-06-15', '2023-06-15 08:00:00', 'C', 'Test Doris', 
'Sample text',
+        ['Eve', 'Frank'], {'alpha': 'beta'}, STRUCT('Alice', 40), '{"nested": 
{"key": "value"}}', '{"variant": "test"}');
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+    (4, NULL, NULL, NULL, NULL, NULL, NULL,
+     NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+     NULL, NULL, NULL, NULL, NULL);
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql
new file mode 100644
index 0000000..8e89178
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql
@@ -0,0 +1,17 @@
+DROP TABLE IF EXISTS tbl_read_tbl_bitmap;
+
+create table tbl_read_tbl_bitmap (
+datekey int,
+hour int,
+device_id bitmap BITMAP_UNION
+)
+aggregate key (datekey, hour)
+distributed by hash(datekey, hour) buckets 1
+properties(
+  "replication_num" = "1"
+);
+
+insert into tbl_read_tbl_bitmap values
+(20200622, 1, to_bitmap(243)),
+(20200622, 2, bitmap_from_array([1,2,3,4,5,434543])),
+(20200622, 3, to_bitmap(287667876573));
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf
new file mode 100644
index 0000000..94b76e0
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+PPROF_TMPDIR="$DORIS_HOME/log/"
+
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 9+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 17+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log 
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE 
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true 
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives 
--add-opens=java.base/java.net=ALL-UNNAMED"
+
+# since 1.2, the JAVA_HOME need to be set to run BE process.
+# JAVA_HOME=/path/to/jdk/
+
+# 
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
+# https://jemalloc.net/jemalloc.3.html
+JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false"
+JEMALLOC_PROF_PRFIX=""
+
+# INFO, WARNING, ERROR, FATAL
+sys_log_level = INFO
+
+# ports for admin, web, heartbeat service
+be_port = 9060
+webserver_port = 8040
+heartbeat_service_port = 9050
+brpc_port = 8060
+arrow_flight_sql_port = 9610
+enable_debug_points = true
+
+# HTTPS configures
+enable_https = false
+# path of certificate in PEM format.
+ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
+# path of private key in PEM format.
+ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
+
+
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# data root path, separate by ';'
+# You can specify the storage type for each root path, HDD (cold data) or SSD 
(hot data)
+# eg:
+# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
+# storage_root_path = 
/home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
+# /home/disk2/doris,medium:HDD(default)
+#
+# you also can specify the properties by setting '<property>:<value>', 
separate by ','
+# property 'medium' has a higher priority than the extension of path
+#
+# Default value is ${DORIS_HOME}/storage, you should create it by hand.
+# storage_root_path = ${DORIS_HOME}/storage
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+# Advanced configurations
+# sys_log_dir = ${DORIS_HOME}/log
+# sys_log_roll_mode = SIZE-MB-1024
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = *
+# log_buffer_level = -1
+# palo_cgroups
+
+# aws sdk log level
+#    Off = 0,
+#    Fatal = 1,
+#    Error = 2,
+#    Warn = 3,
+#    Info = 4,
+#    Debug = 5,
+#    Trace = 6
+# Default to turn off aws sdk log, because aws sdk errors that need to be 
cared will be output through Doris logs
+aws_log_level=0
+## If you are not running in aws cloud, you can disable EC2 metadata
+AWS_EC2_METADATA_DISABLED=true
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf
new file mode 100644
index 0000000..a45fb53
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#####################################################################
+## The uppercase properties are read and exported by bin/start_fe.sh.
+## To see all Frontend configurations,
+## see fe/src/org/apache/doris/common/Config.java
+#####################################################################
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+# Log dir
+LOG_DIR = ${DORIS_HOME}/log
+
+# For jdk 17, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 
-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR 
-Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M
 --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens 
java.base/jdk.internal.ref=ALL-UNNAMED"
+
+# Set your own JAVA_HOME
+# JAVA_HOME=/path/to/jdk/
+
+##
+## the lowercase properties are read by main program.
+##
+
+# store metadata, must be created before start FE.
+# Default value is ${DORIS_HOME}/doris-meta
+# meta_dir = ${DORIS_HOME}/doris-meta
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+http_port = 8030
+rpc_port = 9020
+query_port = 9030
+edit_log_port = 9010
+arrow_flight_sql_port = 9611
+enable_debug_points = true
+arrow_flight_token_cache_size = 50
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# Advanced configurations
+# log_roll_size_mb = 1024
+# INFO, WARN, ERROR, FATAL
+sys_log_level = INFO
+# NORMAL, BRIEF, ASYNC
+sys_log_mode = ASYNC
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = org.apache.doris
+# audit_log_dir = $LOG_DIR
+# audit_log_modules = slow_query, query
+# audit_log_roll_num = 10
+# meta_delay_toleration_second = 10
+# qe_max_connection = 1024
+# qe_query_timeout_second = 300
+# qe_slow_log_ms = 5000
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ecb73d3
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c 
[%t] %x - %m%n
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f13830c..f7e08b9 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,8 +17,10 @@
 
 package org.apache.doris.spark.read.expression
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, And, Not, Or}
 import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.types.{DateType, TimestampType}
 
 class V2ExpressionBuilder(inValueLengthLimit: Int) {
 
@@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
       case _: AlwaysFalse => "1=0"
       case expr: Expression =>
         expr match {
-          case literal: Literal[_] => literal.toString
+          case literal: Literal[_] => visitLiteral(literal)
           case namedRef: NamedReference => namedRef.toString
           case e: GeneralScalarExpression => e.name() match {
             case "IN" =>
@@ -61,6 +63,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
     }
   }
 
+  def visitLiteral(literal: Literal[_]): String = {
+    if (literal.value() == null) {
+      return "null"
+    }
+    literal.dataType() match {
+      case DateType => 
s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'"
+      case TimestampType => 
s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'"
+      case _ => literal.toString
+    }
+  }
+
   def visitStartWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
     s"`$l` LIKE '$value%'"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to