This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9141ce1  [improve] add bitmap case and ci for spark 3.2,3.4,3.5 (#312)
9141ce1 is described below

commit 9141ce151c4b2a69f07883080388c4d4b5ccff26
Author: wudi <676366...@qq.com>
AuthorDate: Mon Apr 28 11:38:36 2025 +0800

    [improve] add bitmap case and ci for spark 3.2,3.4,3.5 (#312)
---
 .github/workflows/run-e2ecase.yml                  |  15 +-
 .github/workflows/run-itcase.yml                   |  16 +-
 spark-doris-connector/pom.xml                      |  32 +-
 .../spark-doris-connector-base/pom.xml             |   1 -
 .../spark-doris-connector-it/pom.xml               |  36 +-
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 643 +++++++++++----------
 .../apache/doris/spark/sql/DorisWriterITCase.scala | 483 +++++++++-------
 .../test/resources/container/ddl/write_bitmap.sql  |  12 +
 .../read/expression/V2ExpressionBuilder.scala      |  15 +-
 .../read/expression/V2ExpressionBuilder.scala      |  14 +-
 10 files changed, 725 insertions(+), 542 deletions(-)

diff --git a/.github/workflows/run-e2ecase.yml 
b/.github/workflows/run-e2ecase.yml
index 2f2949b..1991210 100644
--- a/.github/workflows/run-e2ecase.yml
+++ b/.github/workflows/run-e2ecase.yml
@@ -37,7 +37,7 @@ jobs:
       with:
         distribution: adopt
         java-version: '8'
- 
+
     - name: Run E2ECases for spark 2
       run: |
         cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
@@ -46,7 +46,18 @@ jobs:
       run: |
         cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
 
+    - name: Run E2ECases for spark 3.2
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.2 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
     - name: Run E2ECases for spark 3.3
       run: |
         cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
-    
\ No newline at end of file
+
+    - name: Run E2ECases for spark 3.4
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.4 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
+    - name: Run E2ECases for spark 3.5
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.5 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml
index 10be11e..bdc1225 100644
--- a/.github/workflows/run-itcase.yml
+++ b/.github/workflows/run-itcase.yml
@@ -37,7 +37,7 @@ jobs:
       with:
         distribution: adopt
         java-version: '8'
- 
+
     - name: Run ITCases for spark 2
       run: |
         cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
@@ -46,7 +46,19 @@ jobs:
       run: |
         cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
 
+    - name: Run ITCases for spark 3.2
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.2 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
     - name: Run ITCases for spark 3.3
       run: |
         cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
-    
\ No newline at end of file
+
+    - name: Run ITCases for spark 3.4
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.4 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
+    - name: Run ITCases for spark 3.5
+      run: |
+        cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.5 -pl 
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" 
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 8c975df..95809bc 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -88,7 +88,7 @@
         <arrow.version>15.0.2</arrow.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.scm.id>github</project.scm.id>
-        <netty.version>4.1.104.Final</netty.version>
+        <netty.version>4.1.110.Final</netty.version>
         <fasterxml.jackson.version>2.13.5</fasterxml.jackson.version>
         <thrift-service.version>1.0.1</thrift-service.version>
         <testcontainers.version>1.17.6</testcontainers.version>
@@ -154,9 +154,36 @@
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>
                 <version>${netty.version}</version>
-                <scope>provided</scope>
             </dependency>
 
+            <!-- Spark 3.4 and later require a separate -->
+            <!-- reference 
https://github.com/apache/spark/blob/v3.4.0/pom.xml#L914-L941  -->
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-transport-native-epoll</artifactId>
+                <version>${netty.version}</version>
+                <classifier>linux-x86_64</classifier>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-transport-native-epoll</artifactId>
+                <version>${netty.version}</version>
+                <classifier>linux-aarch_64</classifier>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-transport-native-kqueue</artifactId>
+                <version>${netty.version}</version>
+                <classifier>osx-aarch_64</classifier>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-transport-native-kqueue</artifactId>
+                <version>${netty.version}</version>
+                <classifier>osx-x86_64</classifier>
+            </dependency>
+            <!-- End -->
+
             <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-core_${scala.major.version}</artifactId>
@@ -201,7 +228,6 @@
                 <groupId>org.apache.arrow</groupId>
                 <artifactId>arrow-memory-netty</artifactId>
                 <version>${arrow.version}</version>
-                <scope>runtime</scope>
             </dependency>
 
             <dependency>
diff --git a/spark-doris-connector/spark-doris-connector-base/pom.xml 
b/spark-doris-connector/spark-doris-connector-base/pom.xml
index 39c6904..67ab8c3 100644
--- a/spark-doris-connector/spark-doris-connector-base/pom.xml
+++ b/spark-doris-connector/spark-doris-connector-base/pom.xml
@@ -54,7 +54,6 @@
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
             <version>${netty.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/spark-doris-connector/spark-doris-connector-it/pom.xml 
b/spark-doris-connector/spark-doris-connector-it/pom.xml
index 9797fb6..067a9b2 100644
--- a/spark-doris-connector/spark-doris-connector-it/pom.xml
+++ b/spark-doris-connector/spark-doris-connector-it/pom.xml
@@ -35,9 +35,16 @@
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<spark.doris.connector.artifactId>spark-doris-connector-spark-2</spark.doris.connector.artifactId>
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>${spark.doris.connector.artifactId}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
@@ -75,6 +82,12 @@
             <version>8.0.33</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.25</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
@@ -83,27 +96,18 @@
             <activation>
                 <activeByDefault>true</activeByDefault>
             </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.doris</groupId>
-                    <artifactId>spark-doris-connector-spark-2</artifactId>
-                    <version>${project.version}</version>
-                    <scope>test</scope>
-                </dependency>
-            </dependencies>
+            <properties>
+                
<spark.doris.connector.artifactId>spark-doris-connector-spark-2</spark.doris.connector.artifactId>
+            </properties>
         </profile>
         <profile>
             <id>spark-3-it</id>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.doris</groupId>
-                    
<artifactId>spark-doris-connector-spark-${spark.major.version}</artifactId>
-                    <version>${project.version}</version>
-                    <scope>test</scope>
-                </dependency>
-            </dependencies>
+            <properties>
+                
<spark.doris.connector.artifactId>spark-doris-connector-spark-${spark.major.version}</spark.doris.connector.artifactId>
+            </properties>
         </profile>
     </profiles>
+
     <build>
         <plugins>
             <plugin>
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 893cdf9..1099655 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -71,43 +71,47 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     initializeTable(TABLE_READ, DataModel.DUPLICATE)
     val sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("rddSource")
     val sc = new SparkContext(sparkConf)
-    // sc.setLogLevel("DEBUG")
-    val dorisSparkRDD = sc.dorisRDD(
-      tableIdentifier = Some(DATABASE + "." + TABLE_READ),
-      cfg = Some(Map(
-        "doris.fenodes" -> getFenodes,
-        "doris.request.auth.user" -> getDorisUsername,
-        "doris.request.auth.password" -> getDorisPassword,
-        "doris.fe.init.fetch" -> "false",
-        "doris.read.mode" -> readMode,
-        "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString
-      ))
-    )
-    val result = dorisSparkRDD.collect()
-    sc.stop()
-
-    assert(compareCollectResult(Array(Array("doris", 18), Array("spark", 10)), 
result))
+    try {
+      // sc.setLogLevel("DEBUG")
+      val dorisSparkRDD = sc.dorisRDD(
+        tableIdentifier = Some(DATABASE + "." + TABLE_READ),
+        cfg = Some(Map(
+          "doris.fenodes" -> getFenodes,
+          "doris.request.auth.user" -> getDorisUsername,
+          "doris.request.auth.password" -> getDorisPassword,
+          "doris.fe.init.fetch" -> "false",
+          "doris.read.mode" -> readMode,
+          "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString
+        ))
+      )
+      val result = dorisSparkRDD.collect()
+      assert(compareCollectResult(Array(Array("doris", 18), Array("spark", 
10)), result))
+    } finally {
+      sc.stop()
+    }
   }
 
   @Test
   @throws[Exception]
   def testDataFrameSource(): Unit = {
     initializeTable(TABLE_READ_TBL, DataModel.UNIQUE)
-
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val dorisSparkDF = session.read
-      .format("doris")
-      .option("doris.fenodes", getFenodes)
-      .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
-      .option("doris.user", getDorisUsername)
-      .option("doris.password", getDorisPassword)
-      .option("doris.read.mode", readMode)
-      .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString)
-      .load()
-
-    val result = dorisSparkDF.collect().toList.toString()
-    session.stop()
-    assert("List([doris,18], [spark,10])".equals(result))
+    try {
+      val dorisSparkDF = session.read
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
+        .option("doris.user", getDorisUsername)
+        .option("doris.password", getDorisPassword)
+        .option("doris.read.mode", readMode)
+        .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString)
+        .load()
+
+      val result = dorisSparkDF.collect().toList.toString()
+      assert("List([doris,18], [spark,10])".equals(result))
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -115,27 +119,29 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
   def testSQLSource(): Unit = {
     initializeTable(TABLE_READ_TBL, DataModel.UNIQUE_MOR)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.fe.auto.fetch"="true"
-         |)
-         |""".stripMargin)
-
-    val result = session.sql(
-      """
-        |select  name,age from test_source
-        |""".stripMargin).collect().toList.toString()
-    session.stop()
-
-    assert("List([doris,18], [spark,10])".equals(result))
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.fe.auto.fetch"="true"
+           |)
+           |""".stripMargin)
+
+      val result = session.sql(
+        """
+          |select  name,age from test_source
+          |""".stripMargin).collect().toList.toString()
+      assert("List([doris,18], [spark,10])".equals(result))
+    } finally {
+      session.stop()
+    }
   }
 
   private def initializeTable(table: String, dataModel: DataModel): Unit = {
@@ -177,27 +183,29 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
   def testSQLSourceWithCondition(): Unit = {
     initializeTable(TABLE_READ_TBL, DataModel.AGGREGATE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
-         |)
-         |""".stripMargin)
-
-    val result = session.sql(
-      """
-        |select name,age from test_source where age = 18
-        |""".stripMargin).collect().toList.toString()
-    session.stop()
-
-    assert("List([doris,18])".equals(result))
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+
+      val result = session.sql(
+        """
+          |select name,age from test_source where age = 18
+          |""".stripMargin).collect().toList.toString()
+      assert("List([doris,18])".equals(result))
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -206,55 +214,58 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
 
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
-         |)
-         |""".stripMargin)
-    session.sql("desc test_source").show(true);
-    val actualData = session.sql(
-      """
-        |select * from test_source order by id
-        |""".stripMargin).collect()
-    session.stop()
-
-    val expectedData = Array(
-      Row(1, true, 127, 32767, 2147483647, 9223372036854775807L, 
"170141183460469231731687303715884105727",
-        3.14f, 2.71828, new java.math.BigDecimal("12345.6789"), 
Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A", 
"Hello, Doris!", "This is a string",
-        """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"), 
"""{"name":"Tom","age":30}""",
-        """{"key":"value"}""", """{"type":"variant","data":123}"""),
-      Row(2, false, -128, -32768, -2147483648, -9223372036854775808L, 
"-170141183460469231731687303715884105728",
-        -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"), 
Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B", 
"Doris Test", "Another string!",
-        """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"), 
"""{"name":"Jerry","age":25}""",
-        """{"status":"ok"}""", """{"data":[1,2,3]}"""),
-      Row(3, true, 0, 0, 0, 0, "0",
-        0.0f, 0.0, new java.math.BigDecimal("0.0000"), 
Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C", 
"Test Doris", "Sample text",
-        """["Eve","Frank"]""", Map("alpha" -> "beta"), 
"""{"name":"Alice","age":40}""",
-        """{"nested":{"key":"value"}}""", """{"variant":"test"}"""),
-      Row(4, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null,
-        null, null, null, null, null)
-    )
-
-    val differences = actualData.zip(expectedData).zipWithIndex.flatMap {
-      case ((actualRow, expectedRow), rowIndex) =>
-        actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect {
-          case ((actualValue, expectedValue), colIndex)
-            if actualValue != expectedValue =>
-            s"Row $rowIndex, Column $colIndex: actual=$actualValue, 
expected=$expectedValue"
-        }
-    }
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+      session.sql("desc test_source").show(true);
+      val actualData = session.sql(
+        """
+          |select * from test_source order by id
+          |""".stripMargin).collect()
+
+      val expectedData = Array(
+        Row(1, true, 127, 32767, 2147483647, 9223372036854775807L, 
"170141183460469231731687303715884105727",
+          3.14f, 2.71828, new java.math.BigDecimal("12345.6789"), 
Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A", 
"Hello, Doris!", "This is a string",
+          """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"), 
"""{"name":"Tom","age":30}""",
+          """{"key":"value"}""", """{"type":"variant","data":123}"""),
+        Row(2, false, -128, -32768, -2147483648, -9223372036854775808L, 
"-170141183460469231731687303715884105728",
+          -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"), 
Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B", 
"Doris Test", "Another string!",
+          """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"), 
"""{"name":"Jerry","age":25}""",
+          """{"status":"ok"}""", """{"data":[1,2,3]}"""),
+        Row(3, true, 0, 0, 0, 0, "0",
+          0.0f, 0.0, new java.math.BigDecimal("0.0000"), 
Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C", 
"Test Doris", "Sample text",
+          """["Eve","Frank"]""", Map("alpha" -> "beta"), 
"""{"name":"Alice","age":40}""",
+          """{"nested":{"key":"value"}}""", """{"variant":"test"}"""),
+        Row(4, null, null, null, null, null, null,
+          null, null, null, null, null, null, null, null,
+          null, null, null, null, null)
+      )
+
+      val differences = actualData.zip(expectedData).zipWithIndex.flatMap {
+        case ((actualRow, expectedRow), rowIndex) =>
+          actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect {
+            case ((actualValue, expectedValue), colIndex)
+              if actualValue != expectedValue =>
+              s"Row $rowIndex, Column $colIndex: actual=$actualValue, 
expected=$expectedValue"
+          }
+      }
 
-    if (differences.nonEmpty) {
-      fail(s"Data mismatch found:\n${differences.mkString("\n")}")
+      if (differences.nonEmpty) {
+        fail(s"Data mismatch found:\n${differences.mkString("\n")}")
+      }
+    } finally {
+      session.stop()
     }
   }
 
@@ -263,27 +274,30 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
     ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
-         |)
-         |""".stripMargin)
-    session.sql("desc test_source").show(true);
-    val actualData = session.sql(
-      """
-        |select * from test_source order by hour
-        |""".stripMargin).collect()
-    session.stop()
-
-    assert("List([20200622,1,Read unsupported], [20200622,2,Read unsupported], 
[20200622,3,Read unsupported])".equals(actualData.toList.toString()))
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+      session.sql("desc test_source").show(true);
+      val actualData = session.sql(
+        """
+          |select * from test_source order by hour
+          |""".stripMargin).collect()
+
+      assert("List([20200622,1,Read unsupported], [20200622,2,Read 
unsupported], [20200622,3,Read 
unsupported])".equals(actualData.toList.toString()))
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -294,29 +308,32 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
     ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
-         | "doris.read.bitmap-to-string"="true"
-         |)
-         |""".stripMargin)
-    session.sql("desc test_source").show(true);
-    val actualData = session.sql(
-      """
-        |select * from test_source order by hour
-        |""".stripMargin).collect()
-    session.stop()
-
-    assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543], 
[20200622,3,287667876573])"
-      .equals(actualData.toList.toString()))
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+           | "doris.read.bitmap-to-string"="true"
+           |)
+           |""".stripMargin)
+      session.sql("desc test_source").show(true);
+      val actualData = session.sql(
+        """
+          |select * from test_source order by hour
+          |""".stripMargin).collect()
+
+      assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543], 
[20200622,3,287667876573])"
+        .equals(actualData.toList.toString()))
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -327,29 +344,32 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
     ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
-         | "doris.read.bitmap-to-base64"="true"
-         |)
-         |""".stripMargin)
-    session.sql("desc test_source").show(true);
-    val actualData = session.sql(
-      """
-        |select * from test_source order by hour
-        |""".stripMargin).collect()
-    session.stop()
-
-    assert("List([20200622,1,AfMAAAA=], 
[20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])"
-      .equals(actualData.toList.toString()))
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+           | "doris.read.bitmap-to-base64"="true"
+           |)
+           |""".stripMargin)
+      session.sql("desc test_source").show(true);
+      val actualData = session.sql(
+        """
+          |select * from test_source order by hour
+          |""".stripMargin).collect()
+
+      assert("List([20200622,1,AfMAAAA=], 
[20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])"
+        .equals(actualData.toList.toString()))
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -357,76 +377,79 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
     val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
     ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
-         |)
-         |""".stripMargin)
-
-    val intFilter = session.sql(
-      """
-        |select id,c1,c2 from test_source where id = 2 and c1 = false and c4 
!= 3
-        |""".stripMargin).collect()
-
-    assert("List([2,false,-128])".equals(intFilter.toList.toString()))
-
-    val floatFilter = session.sql(
-      """
-        |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15
-        |""".stripMargin).collect()
-
-    
assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString()))
-
-    val dateFilter = session.sql(
-      """
-        |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13 
like 'Hello%'
-        |""".stripMargin).collect()
-
-    assert("List([1,2025-03-11,2025-03-11 
12:34:56.0])".equals(dateFilter.toList.toString()))
-
-    val datetimeFilter = session.sql(
-      """
-        |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 = 
'2024-12-25 23:59:59'
-        |""".stripMargin).collect()
-
-    assert("List([2,2024-12-25 
23:59:59.0,B])".equals(datetimeFilter.toList.toString()))
-
-    val stringFilter = session.sql(
-      """
-        |select id,c13,c14 from test_source where c11 >= '2024-12-25 23:59:59' 
and c13 = 'Hello, Doris!'
-        |""".stripMargin).collect()
-
-    assert("List([1,Hello, Doris!,This is a 
string])".equals(stringFilter.toList.toString()))
-
-    val nullFilter = session.sql(
-      """
-        |select id,c13,c14 from test_source where c14 is null
-        |""".stripMargin).collect()
-
-    assert("List([4,null,null])".equals(nullFilter.toList.toString()))
-
-    val notNullFilter = session.sql(
-      """
-        |select id from test_source where c15 is not null and c12 in ('A', 'B')
-        |""".stripMargin).collect()
-
-    assert("List([1], [2])".equals(notNullFilter.toList.toString()))
-
-    val likeFilter = session.sql(
-      """
-        |select id from test_source where c19 like '%variant%' and c13 like 
'Test%'
-        |""".stripMargin).collect()
-
-    assert("List([3])".equals(likeFilter.toList.toString()))
-    session.stop()
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+
+      val intFilter = session.sql(
+        """
+          |select id,c1,c2 from test_source where id = 2 and c1 = false and c4 
!= 3
+          |""".stripMargin).collect()
+
+      assert("List([2,false,-128])".equals(intFilter.toList.toString()))
+
+      val floatFilter = session.sql(
+        """
+          |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15
+          |""".stripMargin).collect()
+
+      
assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString()))
+
+      val dateFilter = session.sql(
+        """
+          |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13 
like 'Hello%'
+          |""".stripMargin).collect()
+
+      assert("List([1,2025-03-11,2025-03-11 
12:34:56.0])".equals(dateFilter.toList.toString()))
+
+      val datetimeFilter = session.sql(
+        """
+          |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 
= '2024-12-25 23:59:59'
+          |""".stripMargin).collect()
+
+      assert("List([2,2024-12-25 
23:59:59.0,B])".equals(datetimeFilter.toList.toString()))
+
+      val stringFilter = session.sql(
+        """
+          |select id,c13,c14 from test_source where c11 >= '2024-12-25 
23:59:59' and c13 = 'Hello, Doris!'
+          |""".stripMargin).collect()
+
+      assert("List([1,Hello, Doris!,This is a 
string])".equals(stringFilter.toList.toString()))
+
+      val nullFilter = session.sql(
+        """
+          |select id,c13,c14 from test_source where c14 is null
+          |""".stripMargin).collect()
+
+      assert("List([4,null,null])".equals(nullFilter.toList.toString()))
+
+      val notNullFilter = session.sql(
+        """
+          |select id from test_source where c15 is not null and c12 in ('A', 
'B')
+          |""".stripMargin).collect()
+
+      assert("List([1], [2])".equals(notNullFilter.toList.toString()))
+
+      val likeFilter = session.sql(
+        """
+          |select id from test_source where c19 like '%variant%' and c13 like 
'Test%'
+          |""".stripMargin).collect()
+
+      assert("List([3])".equals(likeFilter.toList.toString()))
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -438,65 +461,67 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
       String.format("insert into %s.%s  values ('中文',60)", DATABASE, 
TABLE_READ_UTF8_TBL))
 
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_UTF8_TBL}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
-         |)
-         |""".stripMargin)
-
-    val utf8Filter = session.sql(
-      """
-        |select name,age from test_source where name = '中文'
-        |""".stripMargin).collect()
-
-    assert("List([中文,60])".equals(utf8Filter.toList.toString()))
-    session.stop()
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_UTF8_TBL}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+
+      val utf8Filter = session.sql(
+        """
+          |select name,age from test_source where name = '中文'
+          |""".stripMargin).collect()
+
+      assert("List([中文,60])".equals(utf8Filter.toList.toString()))
+    } finally {
+      session.stop()
+    }
   }
 
-
   @Test
   def buildCaseWhenTest(): Unit = {
     val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
     ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
 
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_source
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.read.mode"="${readMode}",
-         | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
-         |)
-         |""".stripMargin)
-
-    val resultData = session.sql(
-      """
-        |select * from (
-        |   select
-        |    id,
-        |    (case when c5 > 10 then c2 else null end) as cc1,
-        |    (case when c4 < 5 then c3 else null end) as cc2
-        |   from test_source where c2 is not null
-        |) where !(cc1 is null and cc2 is null) order by id
-        |""".stripMargin)
-
-    assert("List([1,127,null], [2,null,-32768], 
[3,null,0])".equals(resultData.collect().toList.toString()))
-
-    session.stop()
-
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+
+      val resultData = session.sql(
+        """
+          |select * from (
+          |   select
+          |    id,
+          |    (case when c5 > 10 then c2 else null end) as cc1,
+          |    (case when c4 < 5 then c3 else null end) as cc2
+          |   from test_source where c2 is not null
+          |) where !(cc1 is null and cc2 is null) order by id
+          |""".stripMargin)
+
+      assert("List([1,127,null], [2,null,-32768], 
[3,null,0])".equals(resultData.collect().toList.toString()))
+    } finally {
+      session.stop()
+    }
   }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 51201e4..8244c35 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -43,40 +43,44 @@ class DorisWriterITCase extends AbstractContainerTestBase {
   val TABLE_JSON_TBL: String = "tbl_json_tbl"
   val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite"
   val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow"
+  val TABLE_BITMAP_TBL: String = "tbl_write_tbl_bitmap"
 
   @Test
   @throws[Exception]
   def testSinkCsvFormat(): Unit = {
     initializeTable(TABLE_CSV, DataModel.DUPLICATE)
     val session = SparkSession.builder().master("local[1]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_csv", 1),
-      ("spark_csv", 2)
-    )).toDF("name", "age")
-    df.write
-      .format("doris")
-      .option("doris.fenodes", getFenodes)
-      .option("doris.sink.auto-redirect", false)
-      .option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
-      .option("user", getDorisUsername)
-      .option("password", getDorisPassword)
-      .option("sink.properties.column_separator", ",")
-      .option("sink.properties.line_delimiter", "\n")
-      .option("sink.properties.format", "csv")
-      .option("doris.sink.batch.interval.ms", "5000")
-      .option("doris.sink.batch.size", "1")
-      .mode(SaveMode.Append)
-      .save()
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_csv", 1),
+        ("spark_csv", 2)
+      )).toDF("name", "age")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.sink.auto-redirect", false)
+        .option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("sink.properties.column_separator", ",")
+        .option("sink.properties.line_delimiter", "\n")
+        .option("sink.properties.format", "csv")
+        .option("doris.sink.batch.interval.ms", "5000")
+        .option("doris.sink.batch.size", "1")
+        .mode(SaveMode.Append)
+        .save()
 
-    Thread.sleep(15000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_CSV),
-      2)
-    val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
-    checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), 
actual.toArray)
+      Thread.sleep(15000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, TABLE_CSV),
+        2)
+      val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+      checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -84,31 +88,34 @@ class DorisWriterITCase extends AbstractContainerTestBase {
   def testSinkCsvFormatHideSep(): Unit = {
     initializeTable(TABLE_CSV_HIDE_SEP, DataModel.AGGREGATE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_csv", 1),
-      ("spark_csv", 2)
-    )).toDF("name", "age")
-    df.write
-      .format("doris")
-      .option("doris.fenodes", getFenodes + "," + getFenodes)
-      .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP)
-      .option("user", getDorisUsername)
-      .option("password", getDorisPassword)
-      .option("sink.properties.column_separator", "\\x01")
-      .option("sink.properties.line_delimiter", "\\x02")
-      .option("sink.properties.format", "csv")
-      .mode(SaveMode.Append)
-      .save()
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_csv", 1),
+        ("spark_csv", 2)
+      )).toDF("name", "age")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes + "," + getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("sink.properties.column_separator", "\\x01")
+        .option("sink.properties.line_delimiter", "\\x02")
+        .option("sink.properties.format", "csv")
+        .mode(SaveMode.Append)
+        .save()
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP),
-      2)
-    val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
-    checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(), 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP),
+        2)
+      val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+      checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(), 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -116,29 +123,32 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
   def testSinkGroupCommit(): Unit = {
     initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_csv", 1),
-      ("spark_csv", 2)
-    )).toDF("name", "age")
-    df.write
-      .format("doris")
-      .option("doris.fenodes", getFenodes)
-      .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT)
-      .option("user", getDorisUsername)
-      .option("password", getDorisPassword)
-      .option("sink.properties.group_commit", "sync_mode")
-      .mode(SaveMode.Append)
-      .save()
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_csv", 1),
+        ("spark_csv", 2)
+      )).toDF("name", "age")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("sink.properties.group_commit", "sync_mode")
+        .mode(SaveMode.Append)
+        .save()
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT),
-      2)
-    val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
-    checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(), 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT),
+        2)
+      val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+      checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(), 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -146,31 +156,34 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
   def testSinkEmptyPartition(): Unit = {
     initializeTable(TABLE_JSON_EMPTY_PARTITION, DataModel.AGGREGATE)
     val session = SparkSession.builder().master("local[2]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_json", 1)
-    )).toDF("name", "age")
-    df.repartition(2).write
-      .format("doris")
-      .option("doris.fenodes", getFenodes)
-      .option("doris.table.identifier", DATABASE + "." + 
TABLE_JSON_EMPTY_PARTITION)
-      .option("user", getDorisUsername)
-      .option("password", getDorisPassword)
-      .option("sink.properties.read_json_by_line", "true")
-      .option("sink.properties.format", "json")
-      .option("doris.sink.auto-redirect", "false")
-      .option("doris.sink.enable-2pc", "true")
-      .mode(SaveMode.Append)
-      .save()
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_json", 1)
+      )).toDF("name", "age")
+      df.repartition(2).write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + 
TABLE_JSON_EMPTY_PARTITION)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("sink.properties.read_json_by_line", "true")
+        .option("sink.properties.format", "json")
+        .option("doris.sink.auto-redirect", "false")
+        .option("doris.sink.enable-2pc", "true")
+        .mode(SaveMode.Append)
+        .save()
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, 
TABLE_JSON_EMPTY_PARTITION),
-      2)
-    val expected = util.Arrays.asList("doris_json,1");
-    checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray, 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, 
TABLE_JSON_EMPTY_PARTITION),
+        2)
+      val expected = util.Arrays.asList("doris_json,1");
+      checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -178,31 +191,34 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
   def testSinkArrowFormat(): Unit = {
     initializeTable(TABLE_JSON_TBL_ARROW, DataModel.DUPLICATE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_json", 1),
-      ("spark_json", 2)
-    )).toDF("name", "age")
-    df.write
-      .format("doris")
-      .option("doris.fenodes", getFenodes)
-      .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_TBL_ARROW)
-      .option("user", getDorisUsername)
-      .option("password", getDorisPassword)
-      .option("sink.properties.format", "arrow")
-      .option("doris.sink.batch.size", "1")
-      .option("doris.sink.enable-2pc", "true")
-      .mode(SaveMode.Append)
-      .save()
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_json", 1),
+        ("spark_json", 2)
+      )).toDF("name", "age")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + 
TABLE_JSON_TBL_ARROW)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("sink.properties.format", "arrow")
+        .option("doris.sink.batch.size", "1")
+        .option("doris.sink.enable-2pc", "true")
+        .mode(SaveMode.Append)
+        .save()
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW),
-      2)
-    val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
-    checkResultInAnyOrder("testSinkArrowFormat", expected.toArray, 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW),
+        2)
+      val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
+      checkResultInAnyOrder("testSinkArrowFormat", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -210,31 +226,34 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
   def testSinkJsonFormat(): Unit = {
     initializeTable(TABLE_JSON, DataModel.UNIQUE)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_json", 1),
-      ("spark_json", 2)
-    )).toDF("name", "age")
-    df.write
-      .format("doris")
-      .option("doris.fenodes", getFenodes)
-      .option("doris.table.identifier", DATABASE + "." + TABLE_JSON)
-      .option("user", getDorisUsername)
-      .option("password", getDorisPassword)
-      .option("sink.properties.read_json_by_line", "true")
-      .option("sink.properties.format", "json")
-      .option("doris.sink.auto-redirect", "false")
-      .mode(SaveMode.Append)
-      .save()
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_json", 1),
+        ("spark_json", 2)
+      )).toDF("name", "age")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + TABLE_JSON)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("sink.properties.read_json_by_line", "true")
+        .option("sink.properties.format", "json")
+        .option("doris.sink.auto-redirect", "false")
+        .mode(SaveMode.Append)
+        .save()
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_JSON),
-      2)
-    val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
-    checkResultInAnyOrder("testSinkJsonFormat", expected.toArray, 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, TABLE_JSON),
+        2)
+      val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
+      checkResultInAnyOrder("testSinkJsonFormat", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -242,36 +261,39 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
   def testSQLSinkFormat(): Unit = {
     initializeTable(TABLE_JSON_TBL, DataModel.UNIQUE_MOR)
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_tbl", 1),
-      ("spark_tbl", 2)
-    )).toDF("name", "age")
-    df.createTempView("mock_source")
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_sink
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}"
-         |)
-         |""".stripMargin)
-    session.sql(
-      """
-        |insert into test_sink select  name,age from mock_source
-        |""".stripMargin)
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_tbl", 1),
+        ("spark_tbl", 2)
+      )).toDF("name", "age")
+      df.createTempView("mock_source")
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_sink
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}"
+           |)
+           |""".stripMargin)
+      session.sql(
+        """
+          |insert into test_sink select  name,age from mock_source
+          |""".stripMargin)
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL),
-      2)
-    val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
-    checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL),
+        2)
+      val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
+      checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
   @Test
@@ -286,41 +308,88 @@ class DorisWriterITCase extends AbstractContainerTestBase 
{
       String.format("insert into %s.%s  values ('history-spark',1110)", 
DATABASE, TABLE_JSON_TBL_OVERWRITE))
 
     val session = SparkSession.builder().master("local[*]").getOrCreate()
-    val df = session.createDataFrame(Seq(
-      ("doris_tbl", 1),
-      ("spark_tbl", 2)
-    )).toDF("name", "age")
-    df.createTempView("mock_source")
-    session.sql(
-      s"""
-         |CREATE TEMPORARY VIEW test_sink
-         |USING doris
-         |OPTIONS(
-         | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}",
-         | "fenodes"="${getFenodes}",
-         | "user"="${getDorisUsername}",
-         | "password"="${getDorisPassword}",
-         | "doris.query.port"="${getQueryPort}",
-         | "doris.sink.label.prefix"="doris-label-customer",
-         | "doris.sink.enable-2pc"="true"
-         |)
-         |""".stripMargin)
-    session.sql(
-      """
-        |insert overwrite table test_sink select  name,age from mock_source
-        |""".stripMargin)
-    session.stop()
+    try {
+      val df = session.createDataFrame(Seq(
+        ("doris_tbl", 1),
+        ("spark_tbl", 2)
+      )).toDF("name", "age")
+      df.createTempView("mock_source")
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_sink
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.query.port"="${getQueryPort}",
+           | "doris.sink.label.prefix"="doris-label-customer",
+           | "doris.sink.enable-2pc"="true"
+           |)
+           |""".stripMargin)
+      session.sql(
+        """
+          |insert overwrite table test_sink select  name,age from mock_source
+          |""".stripMargin)
 
-    Thread.sleep(10000)
-    val actual = ContainerUtils.executeSQLStatement(
-      getDorisQueryConnection,
-      LOG,
-      String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_OVERWRITE),
-      2)
-    val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
-    checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray, 
actual.toArray)
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, 
TABLE_JSON_TBL_OVERWRITE),
+        2)
+      val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
+      checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
   }
 
+  @Test
+  def testWriteBitmap(): Unit = {
+    val targetInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/write_bitmap.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
targetInitSql: _*)
+
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val df = session.createDataFrame(Seq(
+        (20200621, 1, "243"),
+        (20200622, 2, "1"),
+        (20200623, 3, "287667876573")
+      )).toDF("datekey", "hour", "device_id")
+      df.createTempView("mock_source")
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_sink
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + TABLE_BITMAP_TBL}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | 
"doris.write.fields"="datekey,hour,device_id,device_id=to_bitmap(device_id)"
+           |)
+           |""".stripMargin)
+      session.sql(
+        """
+          |insert into test_sink select datekey,hour,device_id from mock_source
+          |""".stripMargin)
+
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select datekey,hour,bitmap_to_string(device_id) from 
%s.%s", DATABASE, TABLE_BITMAP_TBL),
+        3)
+      val expected = util.Arrays.asList("20200621,1,243", "20200622,2,1", 
"20200623,3,287667876573");
+      checkResultInAnyOrder("testWriteBitmap", expected.toArray, 
actual.toArray)
+    } finally {
+      session.stop()
+    }
+  }
+
+
   private def initializeTable(table: String, dataModel: DataModel): Unit = {
     val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
     val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
@@ -341,7 +410,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
   }
 
   private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], 
actual: Array[AnyRef]): Unit = {
-    LOG.info("Checking DorisWriterFailoverITCase result. testName={}, 
actual={}, expected={}", testName, actual, expected)
+    LOG.info("Checking DorisWriterITCase result. testName={}, actual={}, 
expected={}", testName, actual, expected)
     assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
   }
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_bitmap.sql
 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_bitmap.sql
new file mode 100644
index 0000000..0c76164
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_bitmap.sql
@@ -0,0 +1,12 @@
+DROP TABLE IF EXISTS tbl_write_tbl_bitmap;
+
+create table tbl_write_tbl_bitmap (
+datekey int,
+hour int,
+device_id bitmap BITMAP_UNION
+)
+aggregate key (datekey, hour)
+distributed by hash(datekey, hour) buckets 1
+properties(
+  "replication_num" = "1"
+);
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index 1913a51..cba20f1 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,8 +17,10 @@
 
 package org.apache.doris.spark.read.expression
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, And, Not, Or}
 import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.types.{DateType, TimestampType}
 
 class V2ExpressionBuilder(inValueLengthLimit: Int) {
 
@@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
       case _: AlwaysFalse => "1=0"
       case expr: Expression =>
         expr match {
-          case literal: Literal[_] => literal.toString
+          case literal: Literal[_] => visitLiteral(literal)
           case namedRef: NamedReference => namedRef.toString
           case e: GeneralScalarExpression => e.name() match {
             case "IN" =>
@@ -76,6 +78,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
     }
   }
 
+  def visitLiteral(literal: Literal[_]): String = {
+    if (literal.value() == null) {
+      return "null"
+    }
+    literal.dataType() match {
+      case DateType => 
s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'"
+      case TimestampType => 
s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'"
+      case _ => literal.toString
+    }
+  }
+
   def visitStartWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
     s"`$l` LIKE '$value%'"
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index 1913a51..6e1c010 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,8 +17,10 @@
 
 package org.apache.doris.spark.read.expression
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, And, Not, Or}
 import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.types.{DateType, TimestampType}
 
 class V2ExpressionBuilder(inValueLengthLimit: Int) {
 
@@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
       case _: AlwaysFalse => "1=0"
       case expr: Expression =>
         expr match {
-          case literal: Literal[_] => literal.toString
+          case literal: Literal[_] => visitLiteral(literal)
           case namedRef: NamedReference => namedRef.toString
           case e: GeneralScalarExpression => e.name() match {
             case "IN" =>
@@ -76,6 +78,16 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
     }
   }
 
+  def visitLiteral(literal: Literal[_]): String = {
+    if (literal.value() == null) {
+      return "null"
+    }
+    literal.dataType() match {
+      case DateType => 
s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'"
+      case TimestampType => 
s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'"
+      case _ => literal.toString
+    }
+  }
   def visitStartWith(l: String, r: String): String = {
     val value = r.substring(1, r.length - 1)
     s"`$l` LIKE '$value%'"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to