This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9141ce1 [improve] add bitmap case and ci for spark 3.2,3.4,3.5 (#312) 9141ce1 is described below commit 9141ce151c4b2a69f07883080388c4d4b5ccff26 Author: wudi <676366...@qq.com> AuthorDate: Mon Apr 28 11:38:36 2025 +0800 [improve] add bitmap case and ci for spark 3.2,3.4,3.5 (#312) --- .github/workflows/run-e2ecase.yml | 15 +- .github/workflows/run-itcase.yml | 16 +- spark-doris-connector/pom.xml | 32 +- .../spark-doris-connector-base/pom.xml | 1 - .../spark-doris-connector-it/pom.xml | 36 +- .../apache/doris/spark/sql/DorisReaderITCase.scala | 643 +++++++++++---------- .../apache/doris/spark/sql/DorisWriterITCase.scala | 483 +++++++++------- .../test/resources/container/ddl/write_bitmap.sql | 12 + .../read/expression/V2ExpressionBuilder.scala | 15 +- .../read/expression/V2ExpressionBuilder.scala | 14 +- 10 files changed, 725 insertions(+), 542 deletions(-) diff --git a/.github/workflows/run-e2ecase.yml b/.github/workflows/run-e2ecase.yml index 2f2949b..1991210 100644 --- a/.github/workflows/run-e2ecase.yml +++ b/.github/workflows/run-e2ecase.yml @@ -37,7 +37,7 @@ jobs: with: distribution: adopt java-version: '8' - + - name: Run E2ECases for spark 2 run: | cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" @@ -46,7 +46,18 @@ jobs: run: | cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Run E2ECases for spark 3.2 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.2 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Run E2ECases for spark 3.3 run: | cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" - \ No newline at end of file + + - name: Run E2ECases for spark 3.4 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.4 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + + - name: Run E2ECases for spark 3.5 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.5 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase" -Dimage="apache/doris:doris-all-in-one-2.1.0" diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml index 10be11e..bdc1225 100644 --- a/.github/workflows/run-itcase.yml +++ b/.github/workflows/run-itcase.yml @@ -37,7 +37,7 @@ jobs: with: distribution: adopt java-version: '8' - + - name: Run ITCases for spark 2 run: | cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" @@ -46,7 +46,19 @@ jobs: run: | cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Run ITCases for spark 3.2 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.2 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + - name: Run ITCases for spark 3.3 run: | cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" - \ No newline at end of file + + - name: Run ITCases for spark 3.4 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.4 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + + - name: Run ITCases for spark 3.5 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.5 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="apache/doris:doris-all-in-one-2.1.0" + diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 8c975df..95809bc 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -88,7 +88,7 @@ <arrow.version>15.0.2</arrow.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.scm.id>github</project.scm.id> - <netty.version>4.1.104.Final</netty.version> + <netty.version>4.1.110.Final</netty.version> <fasterxml.jackson.version>2.13.5</fasterxml.jackson.version> <thrift-service.version>1.0.1</thrift-service.version> <testcontainers.version>1.17.6</testcontainers.version> @@ -154,9 +154,36 @@ <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> - <scope>provided</scope> </dependency> + <!-- Spark 3.4 and later require a separate --> + <!-- reference https://github.com/apache/spark/blob/v3.4.0/pom.xml#L914-L941 --> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + <version>${netty.version}</version> + <classifier>linux-x86_64</classifier> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + <version>${netty.version}</version> + <classifier>linux-aarch_64</classifier> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-kqueue</artifactId> + <version>${netty.version}</version> + <classifier>osx-aarch_64</classifier> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-kqueue</artifactId> + <version>${netty.version}</version> + <classifier>osx-x86_64</classifier> + </dependency> + <!-- End --> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.major.version}</artifactId> @@ -201,7 +228,6 @@ <groupId>org.apache.arrow</groupId> <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version> - <scope>runtime</scope> </dependency> <dependency> diff --git a/spark-doris-connector/spark-doris-connector-base/pom.xml b/spark-doris-connector/spark-doris-connector-base/pom.xml index 39c6904..67ab8c3 100644 --- a/spark-doris-connector/spark-doris-connector-base/pom.xml +++ b/spark-doris-connector/spark-doris-connector-base/pom.xml @@ -54,7 +54,6 @@ <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> - <scope>provided</scope> </dependency> <dependency> diff --git a/spark-doris-connector/spark-doris-connector-it/pom.xml b/spark-doris-connector/spark-doris-connector-it/pom.xml index 9797fb6..067a9b2 100644 --- a/spark-doris-connector/spark-doris-connector-it/pom.xml +++ b/spark-doris-connector/spark-doris-connector-it/pom.xml @@ -35,9 +35,16 @@ <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <spark.doris.connector.artifactId>spark-doris-connector-spark-2</spark.doris.connector.artifactId> </properties> <dependencies> + <dependency> + <groupId>org.apache.doris</groupId> + <artifactId>${spark.doris.connector.artifactId}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> @@ -75,6 +82,12 @@ <version>8.0.33</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.25</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> @@ -83,27 +96,18 @@ <activation> <activeByDefault>true</activeByDefault> </activation> - <dependencies> - <dependency> - <groupId>org.apache.doris</groupId> - <artifactId>spark-doris-connector-spark-2</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - </dependencies> + <properties> + <spark.doris.connector.artifactId>spark-doris-connector-spark-2</spark.doris.connector.artifactId> + </properties> </profile> <profile> <id>spark-3-it</id> - <dependencies> - <dependency> - <groupId>org.apache.doris</groupId> - <artifactId>spark-doris-connector-spark-${spark.major.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - </dependencies> + <properties> + <spark.doris.connector.artifactId>spark-doris-connector-spark-${spark.major.version}</spark.doris.connector.artifactId> + </properties> </profile> </profiles> + <build> <plugins> <plugin> diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala index 893cdf9..1099655 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala @@ -71,43 +71,47 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo initializeTable(TABLE_READ, DataModel.DUPLICATE) val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rddSource") val sc = new SparkContext(sparkConf) - // sc.setLogLevel("DEBUG") - val dorisSparkRDD = sc.dorisRDD( - tableIdentifier = Some(DATABASE + "." + TABLE_READ), - cfg = Some(Map( - "doris.fenodes" -> getFenodes, - "doris.request.auth.user" -> getDorisUsername, - "doris.request.auth.password" -> getDorisPassword, - "doris.fe.init.fetch" -> "false", - "doris.read.mode" -> readMode, - "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString - )) - ) - val result = dorisSparkRDD.collect() - sc.stop() - - assert(compareCollectResult(Array(Array("doris", 18), Array("spark", 10)), result)) + try { + // sc.setLogLevel("DEBUG") + val dorisSparkRDD = sc.dorisRDD( + tableIdentifier = Some(DATABASE + "." + TABLE_READ), + cfg = Some(Map( + "doris.fenodes" -> getFenodes, + "doris.request.auth.user" -> getDorisUsername, + "doris.request.auth.password" -> getDorisPassword, + "doris.fe.init.fetch" -> "false", + "doris.read.mode" -> readMode, + "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString + )) + ) + val result = dorisSparkRDD.collect() + assert(compareCollectResult(Array(Array("doris", 18), Array("spark", 10)), result)) + } finally { + sc.stop() + } } @Test @throws[Exception] def testDataFrameSource(): Unit = { initializeTable(TABLE_READ_TBL, DataModel.UNIQUE) - val session = SparkSession.builder().master("local[*]").getOrCreate() - val dorisSparkDF = session.read - .format("doris") - .option("doris.fenodes", getFenodes) - .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL) - .option("doris.user", getDorisUsername) - .option("doris.password", getDorisPassword) - .option("doris.read.mode", readMode) - .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString) - .load() - - val result = dorisSparkDF.collect().toList.toString() - session.stop() - assert("List([doris,18], [spark,10])".equals(result)) + try { + val dorisSparkDF = session.read + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL) + .option("doris.user", getDorisUsername) + .option("doris.password", getDorisPassword) + .option("doris.read.mode", readMode) + .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString) + .load() + + val result = dorisSparkDF.collect().toList.toString() + assert("List([doris,18], [spark,10])".equals(result)) + } finally { + session.stop() + } } @Test @@ -115,27 +119,29 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo def testSQLSource(): Unit = { initializeTable(TABLE_READ_TBL, DataModel.UNIQUE_MOR) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.fe.auto.fetch"="true" - |) - |""".stripMargin) - - val result = session.sql( - """ - |select name,age from test_source - |""".stripMargin).collect().toList.toString() - session.stop() - - assert("List([doris,18], [spark,10])".equals(result)) + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.fe.auto.fetch"="true" + |) + |""".stripMargin) + + val result = session.sql( + """ + |select name,age from test_source + |""".stripMargin).collect().toList.toString() + assert("List([doris,18], [spark,10])".equals(result)) + } finally { + session.stop() + } } private def initializeTable(table: String, dataModel: DataModel): Unit = { @@ -177,27 +183,29 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo def testSQLSourceWithCondition(): Unit = { initializeTable(TABLE_READ_TBL, DataModel.AGGREGATE) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" - |) - |""".stripMargin) - - val result = session.sql( - """ - |select name,age from test_source where age = 18 - |""".stripMargin).collect().toList.toString() - session.stop() - - assert("List([doris,18])".equals(result)) + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + val result = session.sql( + """ + |select name,age from test_source where age = 18 + |""".stripMargin).collect().toList.toString() + assert("List([doris,18])".equals(result)) + } finally { + session.stop() + } } @Test @@ -206,55 +214,58 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" - |) - |""".stripMargin) - session.sql("desc test_source").show(true); - val actualData = session.sql( - """ - |select * from test_source order by id - |""".stripMargin).collect() - session.stop() - - val expectedData = Array( - Row(1, true, 127, 32767, 2147483647, 9223372036854775807L, "170141183460469231731687303715884105727", - 3.14f, 2.71828, new java.math.BigDecimal("12345.6789"), Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A", "Hello, Doris!", "This is a string", - """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"), """{"name":"Tom","age":30}""", - """{"key":"value"}""", """{"type":"variant","data":123}"""), - Row(2, false, -128, -32768, -2147483648, -9223372036854775808L, "-170141183460469231731687303715884105728", - -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"), Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B", "Doris Test", "Another string!", - """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"), """{"name":"Jerry","age":25}""", - """{"status":"ok"}""", """{"data":[1,2,3]}"""), - Row(3, true, 0, 0, 0, 0, "0", - 0.0f, 0.0, new java.math.BigDecimal("0.0000"), Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C", "Test Doris", "Sample text", - """["Eve","Frank"]""", Map("alpha" -> "beta"), """{"name":"Alice","age":40}""", - """{"nested":{"key":"value"}}""", """{"variant":"test"}"""), - Row(4, null, null, null, null, null, null, - null, null, null, null, null, null, null, null, - null, null, null, null, null) - ) - - val differences = actualData.zip(expectedData).zipWithIndex.flatMap { - case ((actualRow, expectedRow), rowIndex) => - actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect { - case ((actualValue, expectedValue), colIndex) - if actualValue != expectedValue => - s"Row $rowIndex, Column $colIndex: actual=$actualValue, expected=$expectedValue" - } - } + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by id + |""".stripMargin).collect() + + val expectedData = Array( + Row(1, true, 127, 32767, 2147483647, 9223372036854775807L, "170141183460469231731687303715884105727", + 3.14f, 2.71828, new java.math.BigDecimal("12345.6789"), Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A", "Hello, Doris!", "This is a string", + """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"), """{"name":"Tom","age":30}""", + """{"key":"value"}""", """{"type":"variant","data":123}"""), + Row(2, false, -128, -32768, -2147483648, -9223372036854775808L, "-170141183460469231731687303715884105728", + -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"), Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B", "Doris Test", "Another string!", + """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"), """{"name":"Jerry","age":25}""", + """{"status":"ok"}""", """{"data":[1,2,3]}"""), + Row(3, true, 0, 0, 0, 0, "0", + 0.0f, 0.0, new java.math.BigDecimal("0.0000"), Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C", "Test Doris", "Sample text", + """["Eve","Frank"]""", Map("alpha" -> "beta"), """{"name":"Alice","age":40}""", + """{"nested":{"key":"value"}}""", """{"variant":"test"}"""), + Row(4, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, + null, null, null, null, null) + ) + + val differences = actualData.zip(expectedData).zipWithIndex.flatMap { + case ((actualRow, expectedRow), rowIndex) => + actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect { + case ((actualValue, expectedValue), colIndex) + if actualValue != expectedValue => + s"Row $rowIndex, Column $colIndex: actual=$actualValue, expected=$expectedValue" + } + } - if (differences.nonEmpty) { - fail(s"Data mismatch found:\n${differences.mkString("\n")}") + if (differences.nonEmpty) { + fail(s"Data mismatch found:\n${differences.mkString("\n")}") + } + } finally { + session.stop() } } @@ -263,27 +274,30 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql") ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" - |) - |""".stripMargin) - session.sql("desc test_source").show(true); - val actualData = session.sql( - """ - |select * from test_source order by hour - |""".stripMargin).collect() - session.stop() - - assert("List([20200622,1,Read unsupported], [20200622,2,Read unsupported], [20200622,3,Read unsupported])".equals(actualData.toList.toString())) + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by hour + |""".stripMargin).collect() + + assert("List([20200622,1,Read unsupported], [20200622,2,Read unsupported], [20200622,3,Read unsupported])".equals(actualData.toList.toString())) + } finally { + session.stop() + } } @Test @@ -294,29 +308,32 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql") ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}", - | "doris.read.bitmap-to-string"="true" - |) - |""".stripMargin) - session.sql("desc test_source").show(true); - val actualData = session.sql( - """ - |select * from test_source order by hour - |""".stripMargin).collect() - session.stop() - - assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543], [20200622,3,287667876573])" - .equals(actualData.toList.toString())) + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}", + | "doris.read.bitmap-to-string"="true" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by hour + |""".stripMargin).collect() + + assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543], [20200622,3,287667876573])" + .equals(actualData.toList.toString())) + } finally { + session.stop() + } } @Test @@ -327,29 +344,32 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql") ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}", - | "doris.read.bitmap-to-base64"="true" - |) - |""".stripMargin) - session.sql("desc test_source").show(true); - val actualData = session.sql( - """ - |select * from test_source order by hour - |""".stripMargin).collect() - session.stop() - - assert("List([20200622,1,AfMAAAA=], [20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])" - .equals(actualData.toList.toString())) + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}", + | "doris.read.bitmap-to-base64"="true" + |) + |""".stripMargin) + session.sql("desc test_source").show(true); + val actualData = session.sql( + """ + |select * from test_source order by hour + |""".stripMargin).collect() + + assert("List([20200622,1,AfMAAAA=], [20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])" + .equals(actualData.toList.toString())) + } finally { + session.stop() + } } @Test @@ -357,76 +377,79 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql") ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" - |) - |""".stripMargin) - - val intFilter = session.sql( - """ - |select id,c1,c2 from test_source where id = 2 and c1 = false and c4 != 3 - |""".stripMargin).collect() - - assert("List([2,false,-128])".equals(intFilter.toList.toString())) - - val floatFilter = session.sql( - """ - |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15 - |""".stripMargin).collect() - - assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString())) - - val dateFilter = session.sql( - """ - |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13 like 'Hello%' - |""".stripMargin).collect() - - assert("List([1,2025-03-11,2025-03-11 12:34:56.0])".equals(dateFilter.toList.toString())) - - val datetimeFilter = session.sql( - """ - |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 = '2024-12-25 23:59:59' - |""".stripMargin).collect() - - assert("List([2,2024-12-25 23:59:59.0,B])".equals(datetimeFilter.toList.toString())) - - val stringFilter = session.sql( - """ - |select id,c13,c14 from test_source where c11 >= '2024-12-25 23:59:59' and c13 = 'Hello, Doris!' - |""".stripMargin).collect() - - assert("List([1,Hello, Doris!,This is a string])".equals(stringFilter.toList.toString())) - - val nullFilter = session.sql( - """ - |select id,c13,c14 from test_source where c14 is null - |""".stripMargin).collect() - - assert("List([4,null,null])".equals(nullFilter.toList.toString())) - - val notNullFilter = session.sql( - """ - |select id from test_source where c15 is not null and c12 in ('A', 'B') - |""".stripMargin).collect() - - assert("List([1], [2])".equals(notNullFilter.toList.toString())) - - val likeFilter = session.sql( - """ - |select id from test_source where c19 like '%variant%' and c13 like 'Test%' - |""".stripMargin).collect() - - assert("List([3])".equals(likeFilter.toList.toString())) - session.stop() + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + val intFilter = session.sql( + """ + |select id,c1,c2 from test_source where id = 2 and c1 = false and c4 != 3 + |""".stripMargin).collect() + + assert("List([2,false,-128])".equals(intFilter.toList.toString())) + + val floatFilter = session.sql( + """ + |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15 + |""".stripMargin).collect() + + assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString())) + + val dateFilter = session.sql( + """ + |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13 like 'Hello%' + |""".stripMargin).collect() + + assert("List([1,2025-03-11,2025-03-11 12:34:56.0])".equals(dateFilter.toList.toString())) + + val datetimeFilter = session.sql( + """ + |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 = '2024-12-25 23:59:59' + |""".stripMargin).collect() + + assert("List([2,2024-12-25 23:59:59.0,B])".equals(datetimeFilter.toList.toString())) + + val stringFilter = session.sql( + """ + |select id,c13,c14 from test_source where c11 >= '2024-12-25 23:59:59' and c13 = 'Hello, Doris!' + |""".stripMargin).collect() + + assert("List([1,Hello, Doris!,This is a string])".equals(stringFilter.toList.toString())) + + val nullFilter = session.sql( + """ + |select id,c13,c14 from test_source where c14 is null + |""".stripMargin).collect() + + assert("List([4,null,null])".equals(nullFilter.toList.toString())) + + val notNullFilter = session.sql( + """ + |select id from test_source where c15 is not null and c12 in ('A', 'B') + |""".stripMargin).collect() + + assert("List([1], [2])".equals(notNullFilter.toList.toString())) + + val likeFilter = session.sql( + """ + |select id from test_source where c19 like '%variant%' and c13 like 'Test%' + |""".stripMargin).collect() + + assert("List([3])".equals(likeFilter.toList.toString())) + } finally { + session.stop() + } } @Test @@ -438,65 +461,67 @@ class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends AbstractCo String.format("insert into %s.%s values ('中文',60)", DATABASE, TABLE_READ_UTF8_TBL)) val session = SparkSession.builder().master("local[*]").getOrCreate() - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_UTF8_TBL}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" - |) - |""".stripMargin) - - val utf8Filter = session.sql( - """ - |select name,age from test_source where name = '中文' - |""".stripMargin).collect() - - assert("List([中文,60])".equals(utf8Filter.toList.toString())) - session.stop() + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_UTF8_TBL}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + val utf8Filter = session.sql( + """ + |select name,age from test_source where name = '中文' + |""".stripMargin).collect() + + assert("List([中文,60])".equals(utf8Filter.toList.toString())) + } finally { + session.stop() + } } - @Test def buildCaseWhenTest(): Unit = { val sourceInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql") ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, sourceInitSql: _*) val session = SparkSession.builder().master("local[*]").getOrCreate() - - session.sql( - s""" - |CREATE TEMPORARY VIEW test_source - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.read.mode"="${readMode}", - | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" - |) - |""".stripMargin) - - val resultData = session.sql( - """ - |select * from ( - | select - | id, - | (case when c5 > 10 then c2 else null end) as cc1, - | (case when c4 < 5 then c3 else null end) as cc2 - | from test_source where c2 is not null - |) where !(cc1 is null and cc2 is null) order by id - |""".stripMargin) - - assert("List([1,127,null], [2,null,-32768], [3,null,0])".equals(resultData.collect().toList.toString())) - - session.stop() - + try { + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.read.mode"="${readMode}", + | "doris.read.arrow-flight-sql.port"="${flightSqlPort}" + |) + |""".stripMargin) + + val resultData = session.sql( + """ + |select * from ( + | select + | id, + | (case when c5 > 10 then c2 else null end) as cc1, + | (case when c4 < 5 then c3 else null end) as cc2 + | from test_source where c2 is not null + |) where !(cc1 is null and cc2 is null) order by id + |""".stripMargin) + + assert("List([1,127,null], [2,null,-32768], [3,null,0])".equals(resultData.collect().toList.toString())) + } finally { + session.stop() + } } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala index 51201e4..8244c35 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala @@ -43,40 +43,44 @@ class DorisWriterITCase extends AbstractContainerTestBase { val TABLE_JSON_TBL: String = "tbl_json_tbl" val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite" val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow" + val TABLE_BITMAP_TBL: String = "tbl_write_tbl_bitmap" @Test @throws[Exception] def testSinkCsvFormat(): Unit = { initializeTable(TABLE_CSV, DataModel.DUPLICATE) val session = SparkSession.builder().master("local[1]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_csv", 1), - ("spark_csv", 2) - )).toDF("name", "age") - df.write - .format("doris") - .option("doris.fenodes", getFenodes) - .option("doris.sink.auto-redirect", false) - .option("doris.table.identifier", DATABASE + "." + TABLE_CSV) - .option("user", getDorisUsername) - .option("password", getDorisPassword) - .option("sink.properties.column_separator", ",") - .option("sink.properties.line_delimiter", "\n") - .option("sink.properties.format", "csv") - .option("doris.sink.batch.interval.ms", "5000") - .option("doris.sink.batch.size", "1") - .mode(SaveMode.Append) - .save() - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_csv", 1), + ("spark_csv", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.sink.auto-redirect", false) + .option("doris.table.identifier", DATABASE + "." + TABLE_CSV) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.column_separator", ",") + .option("sink.properties.line_delimiter", "\n") + .option("sink.properties.format", "csv") + .option("doris.sink.batch.interval.ms", "5000") + .option("doris.sink.batch.size", "1") + .mode(SaveMode.Append) + .save() - Thread.sleep(15000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_CSV), - 2) - val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") - checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), actual.toArray) + Thread.sleep(15000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_CSV), + 2) + val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") + checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(), actual.toArray) + } finally { + session.stop() + } } @Test @@ -84,31 +88,34 @@ class DorisWriterITCase extends AbstractContainerTestBase { def testSinkCsvFormatHideSep(): Unit = { initializeTable(TABLE_CSV_HIDE_SEP, DataModel.AGGREGATE) val session = SparkSession.builder().master("local[*]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_csv", 1), - ("spark_csv", 2) - )).toDF("name", "age") - df.write - .format("doris") - .option("doris.fenodes", getFenodes + "," + getFenodes) - .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP) - .option("user", getDorisUsername) - .option("password", getDorisPassword) - .option("sink.properties.column_separator", "\\x01") - .option("sink.properties.line_delimiter", "\\x02") - .option("sink.properties.format", "csv") - .mode(SaveMode.Append) - .save() - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_csv", 1), + ("spark_csv", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes + "," + getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.column_separator", "\\x01") + .option("sink.properties.line_delimiter", "\\x02") + .option("sink.properties.format", "csv") + .mode(SaveMode.Append) + .save() - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP), - 2) - val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") - checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(), actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP), + 2) + val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") + checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(), actual.toArray) + } finally { + session.stop() + } } @Test @@ -116,29 +123,32 @@ class DorisWriterITCase extends AbstractContainerTestBase { def testSinkGroupCommit(): Unit = { initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE) val session = SparkSession.builder().master("local[*]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_csv", 1), - ("spark_csv", 2) - )).toDF("name", "age") - df.write - .format("doris") - .option("doris.fenodes", getFenodes) - .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT) - .option("user", getDorisUsername) - .option("password", getDorisPassword) - .option("sink.properties.group_commit", "sync_mode") - .mode(SaveMode.Append) - .save() - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_csv", 1), + ("spark_csv", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.group_commit", "sync_mode") + .mode(SaveMode.Append) + .save() - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT), - 2) - val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") - checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(), actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT), + 2) + val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2") + checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(), actual.toArray) + } finally { + session.stop() + } } @Test @@ -146,31 +156,34 @@ class DorisWriterITCase extends AbstractContainerTestBase { def testSinkEmptyPartition(): Unit = { initializeTable(TABLE_JSON_EMPTY_PARTITION, DataModel.AGGREGATE) val session = SparkSession.builder().master("local[2]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_json", 1) - )).toDF("name", "age") - df.repartition(2).write - .format("doris") - .option("doris.fenodes", getFenodes) - .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_EMPTY_PARTITION) - .option("user", getDorisUsername) - .option("password", getDorisPassword) - .option("sink.properties.read_json_by_line", "true") - .option("sink.properties.format", "json") - .option("doris.sink.auto-redirect", "false") - .option("doris.sink.enable-2pc", "true") - .mode(SaveMode.Append) - .save() - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_json", 1) + )).toDF("name", "age") + df.repartition(2).write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_EMPTY_PARTITION) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.read_json_by_line", "true") + .option("sink.properties.format", "json") + .option("doris.sink.auto-redirect", "false") + .option("doris.sink.enable-2pc", "true") + .mode(SaveMode.Append) + .save() - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_JSON_EMPTY_PARTITION), - 2) - val expected = util.Arrays.asList("doris_json,1"); - checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray, actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_EMPTY_PARTITION), + 2) + val expected = util.Arrays.asList("doris_json,1"); + checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray, actual.toArray) + } finally { + session.stop() + } } @Test @@ -178,31 +191,34 @@ class DorisWriterITCase extends AbstractContainerTestBase { def testSinkArrowFormat(): Unit = { initializeTable(TABLE_JSON_TBL_ARROW, DataModel.DUPLICATE) val session = SparkSession.builder().master("local[*]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_json", 1), - ("spark_json", 2) - )).toDF("name", "age") - df.write - .format("doris") - .option("doris.fenodes", getFenodes) - .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_TBL_ARROW) - .option("user", getDorisUsername) - .option("password", getDorisPassword) - .option("sink.properties.format", "arrow") - .option("doris.sink.batch.size", "1") - .option("doris.sink.enable-2pc", "true") - .mode(SaveMode.Append) - .save() - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_json", 1), + ("spark_json", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_TBL_ARROW) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.format", "arrow") + .option("doris.sink.batch.size", "1") + .option("doris.sink.enable-2pc", "true") + .mode(SaveMode.Append) + .save() - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW), - 2) - val expected = util.Arrays.asList("doris_json,1", "spark_json,2"); - checkResultInAnyOrder("testSinkArrowFormat", expected.toArray, actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW), + 2) + val expected = util.Arrays.asList("doris_json,1", "spark_json,2"); + checkResultInAnyOrder("testSinkArrowFormat", expected.toArray, actual.toArray) + } finally { + session.stop() + } } @Test @@ -210,31 +226,34 @@ class DorisWriterITCase extends AbstractContainerTestBase { def testSinkJsonFormat(): Unit = { initializeTable(TABLE_JSON, DataModel.UNIQUE) val session = SparkSession.builder().master("local[*]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_json", 1), - ("spark_json", 2) - )).toDF("name", "age") - df.write - .format("doris") - .option("doris.fenodes", getFenodes) - .option("doris.table.identifier", DATABASE + "." + TABLE_JSON) - .option("user", getDorisUsername) - .option("password", getDorisPassword) - .option("sink.properties.read_json_by_line", "true") - .option("sink.properties.format", "json") - .option("doris.sink.auto-redirect", "false") - .mode(SaveMode.Append) - .save() - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_json", 1), + ("spark_json", 2) + )).toDF("name", "age") + df.write + .format("doris") + .option("doris.fenodes", getFenodes) + .option("doris.table.identifier", DATABASE + "." + TABLE_JSON) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .option("sink.properties.read_json_by_line", "true") + .option("sink.properties.format", "json") + .option("doris.sink.auto-redirect", "false") + .mode(SaveMode.Append) + .save() - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_JSON), - 2) - val expected = util.Arrays.asList("doris_json,1", "spark_json,2"); - checkResultInAnyOrder("testSinkJsonFormat", expected.toArray, actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON), + 2) + val expected = util.Arrays.asList("doris_json,1", "spark_json,2"); + checkResultInAnyOrder("testSinkJsonFormat", expected.toArray, actual.toArray) + } finally { + session.stop() + } } @Test @@ -242,36 +261,39 @@ class DorisWriterITCase extends AbstractContainerTestBase { def testSQLSinkFormat(): Unit = { initializeTable(TABLE_JSON_TBL, DataModel.UNIQUE_MOR) val session = SparkSession.builder().master("local[*]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_tbl", 1), - ("spark_tbl", 2) - )).toDF("name", "age") - df.createTempView("mock_source") - session.sql( - s""" - |CREATE TEMPORARY VIEW test_sink - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}" - |) - |""".stripMargin) - session.sql( - """ - |insert into test_sink select name,age from mock_source - |""".stripMargin) - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_tbl", 1), + ("spark_tbl", 2) + )).toDF("name", "age") + df.createTempView("mock_source") + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}" + |) + |""".stripMargin) + session.sql( + """ + |insert into test_sink select name,age from mock_source + |""".stripMargin) - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL), - 2) - val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2"); - checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL), + 2) + val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2"); + checkResultInAnyOrder("testSQLSinkFormat", expected.toArray, actual.toArray) + } finally { + session.stop() + } } @Test @@ -286,41 +308,88 @@ class DorisWriterITCase extends AbstractContainerTestBase { String.format("insert into %s.%s values ('history-spark',1110)", DATABASE, TABLE_JSON_TBL_OVERWRITE)) val session = SparkSession.builder().master("local[*]").getOrCreate() - val df = session.createDataFrame(Seq( - ("doris_tbl", 1), - ("spark_tbl", 2) - )).toDF("name", "age") - df.createTempView("mock_source") - session.sql( - s""" - |CREATE TEMPORARY VIEW test_sink - |USING doris - |OPTIONS( - | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}", - | "fenodes"="${getFenodes}", - | "user"="${getDorisUsername}", - | "password"="${getDorisPassword}", - | "doris.query.port"="${getQueryPort}", - | "doris.sink.label.prefix"="doris-label-customer", - | "doris.sink.enable-2pc"="true" - |) - |""".stripMargin) - session.sql( - """ - |insert overwrite table test_sink select name,age from mock_source - |""".stripMargin) - session.stop() + try { + val df = session.createDataFrame(Seq( + ("doris_tbl", 1), + ("spark_tbl", 2) + )).toDF("name", "age") + df.createTempView("mock_source") + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.query.port"="${getQueryPort}", + | "doris.sink.label.prefix"="doris-label-customer", + | "doris.sink.enable-2pc"="true" + |) + |""".stripMargin) + session.sql( + """ + |insert overwrite table test_sink select name,age from mock_source + |""".stripMargin) - Thread.sleep(10000) - val actual = ContainerUtils.executeSQLStatement( - getDorisQueryConnection, - LOG, - String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_OVERWRITE), - 2) - val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2"); - checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray, actual.toArray) + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_OVERWRITE), + 2) + val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2"); + checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray, actual.toArray) + } finally { + session.stop() + } } + @Test + def testWriteBitmap(): Unit = { + val targetInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/write_bitmap.sql") + ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, targetInitSql: _*) + + val session = SparkSession.builder().master("local[*]").getOrCreate() + try { + val df = session.createDataFrame(Seq( + (20200621, 1, "243"), + (20200622, 2, "1"), + (20200623, 3, "287667876573") + )).toDF("datekey", "hour", "device_id") + df.createTempView("mock_source") + session.sql( + s""" + |CREATE TEMPORARY VIEW test_sink + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_BITMAP_TBL}", + | "fenodes"="${getFenodes}", + | "user"="${getDorisUsername}", + | "password"="${getDorisPassword}", + | "doris.write.fields"="datekey,hour,device_id,device_id=to_bitmap(device_id)" + |) + |""".stripMargin) + session.sql( + """ + |insert into test_sink select datekey,hour,device_id from mock_source + |""".stripMargin) + + Thread.sleep(10000) + val actual = ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("select datekey,hour,bitmap_to_string(device_id) from %s.%s", DATABASE, TABLE_BITMAP_TBL), + 3) + val expected = util.Arrays.asList("20200621,1,243", "20200622,2,1", "20200623,3,287667876573"); + checkResultInAnyOrder("testWriteBitmap", expected.toArray, actual.toArray) + } finally { + session.stop() + } + } + + private def initializeTable(table: String, dataModel: DataModel): Unit = { val max = if (DataModel.AGGREGATE == dataModel) "MAX" else "" val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else ",\"enable_unique_key_merge_on_write\" = \"false\"" @@ -341,7 +410,7 @@ class DorisWriterITCase extends AbstractContainerTestBase { } private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], actual: Array[AnyRef]): Unit = { - LOG.info("Checking DorisWriterFailoverITCase result. testName={}, actual={}, expected={}", testName, actual, expected) + LOG.info("Checking DorisWriterITCase result. testName={}, actual={}, expected={}", testName, actual, expected) assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava) } } diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_bitmap.sql b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_bitmap.sql new file mode 100644 index 0000000..0c76164 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_bitmap.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS tbl_write_tbl_bitmap; + +create table tbl_write_tbl_bitmap ( +datekey int, +hour int, +device_id bitmap BITMAP_UNION +) +aggregate key (datekey, hour) +distributed by hash(datekey, hour) buckets 1 +properties( + "replication_num" = "1" +); \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala index 1913a51..cba20f1 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -17,8 +17,10 @@ package org.apache.doris.spark.read.expression +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or} import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} +import org.apache.spark.sql.types.{DateType, TimestampType} class V2ExpressionBuilder(inValueLengthLimit: Int) { @@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { case _: AlwaysFalse => "1=0" case expr: Expression => expr match { - case literal: Literal[_] => literal.toString + case literal: Literal[_] => visitLiteral(literal) case namedRef: NamedReference => namedRef.toString case e: GeneralScalarExpression => e.name() match { case "IN" => @@ -76,6 +78,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { } } + def visitLiteral(literal: Literal[_]): String = { + if (literal.value() == null) { + return "null" + } + literal.dataType() match { + case DateType => s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'" + case TimestampType => s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'" + case _ => literal.toString + } + } + def visitStartWith(l: String, r: String): String = { val value = r.substring(1, r.length - 1) s"`$l` LIKE '$value%'" diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala index 1913a51..6e1c010 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -17,8 +17,10 @@ package org.apache.doris.spark.read.expression +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or} import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} +import org.apache.spark.sql.types.{DateType, TimestampType} class V2ExpressionBuilder(inValueLengthLimit: Int) { @@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { case _: AlwaysFalse => "1=0" case expr: Expression => expr match { - case literal: Literal[_] => literal.toString + case literal: Literal[_] => visitLiteral(literal) case namedRef: NamedReference => namedRef.toString case e: GeneralScalarExpression => e.name() match { case "IN" => @@ -76,6 +78,16 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) { } } + def visitLiteral(literal: Literal[_]): String = { + if (literal.value() == null) { + return "null" + } + literal.dataType() match { + case DateType => s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'" + case TimestampType => s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'" + case _ => literal.toString + } + } def visitStartWith(l: String, r: String): String = { val value = r.substring(1, r.length - 1) s"`$l` LIKE '$value%'" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org