This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new ee2af05 [Improve](test) add structured streaming case (#302) ee2af05 is described below commit ee2af05310bce9793bbf099faf2f5097b2cb6a6c Author: wudi <676366...@qq.com> AuthorDate: Tue Apr 1 14:24:07 2025 +0800 [Improve](test) add structured streaming case (#302) --- .../spark/sql/DorisStreamingWriterITCase.scala | 116 +++++++++++++++++++++ .../spark/sql/DorisWriterFailoverITCase.scala | 8 +- 2 files changed, 123 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisStreamingWriterITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisStreamingWriterITCase.scala new file mode 100644 index 0000000..530fda2 --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisStreamingWriterITCase.scala @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection +import org.apache.doris.spark.container.{AbstractContainerTestBase, ContainerUtils} +import org.apache.doris.spark.rest.models.DataModel +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} +import org.junit.{Before, Test} +import org.slf4j.LoggerFactory + +/** + * Doris Structured Streaming Test Case. + */ +class DorisStreamingWriterITCase extends AbstractContainerTestBase { + + private val LOG = LoggerFactory.getLogger(classOf[DorisStreamingWriterITCase]) + val DATABASE = "test_doris_streaming" + val TABLE_WRITE_TBL_STREAM = "tbl_write_tbl_stream" + + @Before + def setUp(): Unit = { + ContainerUtils.executeSQLStatement(getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)) + } + + @Test + def testStreamingWriter(): Unit = { + initializeTable(TABLE_WRITE_TBL_STREAM, DataModel.DUPLICATE) + val spark = SparkSession.builder() + .appName("RateSourceExample") + .master("local[1]") + .getOrCreate() + + val rateStream = spark.readStream + .format("rate") + .option("rowsPerSecond", 1) + .load() + + //timestamp,value + val dorissink = rateStream.writeStream + .format("doris") + .option("checkpointLocation", "/tmp/checkpoint") + .option("doris.table.identifier", DATABASE + "." + TABLE_WRITE_TBL_STREAM) + .option("doris.fenodes", getFenodes) + .option("user", getDorisUsername) + .option("password", getDorisPassword) + .start() + + var totalCount = 0L + spark.streams.addListener(new StreamingQueryListener { + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + val progress: StreamingQueryProgress = event.progress + val numInputRows = progress.numInputRows + totalCount += numInputRows + println(s"Processed $numInputRows rows, Total: $totalCount") + + if (totalCount >= 10) { + println(s"Total count reached 10, stopping query.") + dorissink.stop() + } + } + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} + }) + + dorissink.awaitTermination() + + spark.stop() + val cnt = ContainerUtils.executeSQLStatement( + getDorisQueryConnection(DATABASE), + LOG, + String.format("SELECT count(1) FROM %s.%s", DATABASE, TABLE_WRITE_TBL_STREAM), + 1 + ) + assert(cnt.get(0).toInt == totalCount) + } + + private def initializeTable(table: String, dataModel: DataModel): Unit = { + val max = if (DataModel.AGGREGATE == dataModel) "MAX" else "" + val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else ",\"enable_unique_key_merge_on_write\" = \"false\"" + val model = if (dataModel == DataModel.UNIQUE_MOR) DataModel.UNIQUE.toString else dataModel.toString + ContainerUtils.executeSQLStatement( + getDorisQueryConnection, + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format("CREATE TABLE %s.%s ( \n" + + "`timestamp` DATETIME,\n" + + "`value` int %s\n" + + ") " + + " %s KEY(`timestamp`) " + + " DISTRIBUTED BY HASH(`timestamp`) BUCKETS 1\n" + + "PROPERTIES (" + + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, max, model)) + } +} diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala index bbaf7bd..7c1d48f 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala @@ -51,6 +51,7 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { @Test def testFailoverForRetry(): Unit = { + LOG.info("start to test testFailoverForRetry.") initializeTable(TABLE_WRITE_TBL_RETRY, DataModel.DUPLICATE) val session = SparkSession.builder().master("local[1]").getOrCreate() val df = session.createDataFrame(Seq( @@ -125,8 +126,9 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { */ @Test def testFailoverForTaskRetry(): Unit = { + LOG.info("start to test testFailoverForTaskRetry.") initializeTable(TABLE_WRITE_TBL_TASK_RETRY, DataModel.DUPLICATE) - val session = SparkSession.builder().master("local[1,100]").getOrCreate() + val session = SparkSession.builder().master("local[1,1000]").getOrCreate() val df = session.createDataFrame(Seq( ("doris", "cn"), ("spark", "us"), @@ -167,6 +169,7 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { try { // query may be failed result = ContainerUtils.executeSQLStatement(connection, LOG, query, 15).asScala.toList + Thread.sleep(10) } catch { case ex: Exception => LOG.error("Failed to query result, cause " + ex.getMessage) @@ -184,6 +187,9 @@ class DorisWriterFailoverITCase extends AbstractContainerTestBase { future.get(60, TimeUnit.SECONDS) session.stop() + + //make sure publish success + Thread.sleep(5000) val actual = ContainerUtils.executeSQLStatement( getDorisQueryConnection, LOG, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org