This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 252ecd3 [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink 252ecd3 is described below commit 252ecd333ff7fa65c50e72fec25e7f5ee66bc9e7 Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Wed Nov 6 17:08:42 2019 -0800 [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink ### What changes were proposed in this pull request? This patch leverages V2 continuous memory stream to extract tests from Kafka micro-batch sink suite and continuous sink suite and deduplicate them. These tests are basically doing the same, except how to run and verify the result. ### Why are the changes needed? We no longer have same tests spotted on two places - brings 300 lines deletion. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UTs. Closes #26292 from HeartSaVioR/SPARK-29635. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../main/scala/org/apache/spark/TestUtils.scala | 16 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 395 --------------------- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 265 ++++++++------ 3 files changed, 177 insertions(+), 499 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 5d79394..1f06364 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -24,7 +24,7 @@ import java.nio.file.{Files => JavaFiles} import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} import java.security.SecureRandom import java.security.cert.X509Certificate -import java.util.{Arrays, EnumSet, Properties} +import java.util.{Arrays, EnumSet, Locale, Properties} import java.util.concurrent.{TimeoutException, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream, Manifest} import javax.net.ssl._ @@ -214,12 +214,20 @@ private[spark] object TestUtils { * Asserts that exception message contains the message. Please note this checks all * exceptions in the tree. */ - def assertExceptionMsg(exception: Throwable, msg: String): Unit = { + def assertExceptionMsg(exception: Throwable, msg: String, ignoreCase: Boolean = false): Unit = { + def contain(msg1: String, msg2: String): Boolean = { + if (ignoreCase) { + msg1.toLowerCase(Locale.ROOT).contains(msg2.toLowerCase(Locale.ROOT)) + } else { + msg1.contains(msg2) + } + } + var e = exception - var contains = e.getMessage.contains(msg) + var contains = contain(e.getMessage, msg) while (e.getCause != null && !contains) { e = e.getCause - contains = e.getMessage.contains(msg) + contains = contain(e.getMessage, msg) } assert(contains, s"Exception tree doesn't contain the expected message: $msg") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala deleted file mode 100644 index 031f609..0000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.util.Locale - -import scala.reflect.ClassTag - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} -import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.streaming._ -import org.apache.spark.sql.types.{BinaryType, DataType} -import org.apache.spark.util.Utils - -/** - * This is a temporary port of KafkaSinkSuite, since we do not yet have a V2 memory stream. - * Once we have one, this will be changed to a specialization of KafkaSinkSuite and we won't have - * to duplicate all the code. - */ -class KafkaContinuousSinkSuite extends KafkaContinuousTest { - import testImplicits._ - - override val streamingTimeout = 30.seconds - - override val brokerProps = Map("auto.create.topics.enable" -> "false") - - override def afterAll(): Unit = { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - } - super.afterAll() - } - - test("streaming - write to kafka with topic field") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = None, - withOutputMode = Some(OutputMode.Append))( - withSelectExpr = s"'$topic' as topic", "value") - - val reader = createKafkaReader(topic) - .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") - .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") - .as[(Option[Int], Int)] - .map(_._2) - - try { - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - } - testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) - eventually(timeout(streamingTimeout)) { - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } - } finally { - writer.stop() - } - } - - test("streaming - write w/o topic field, with topic option") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = Some(topic), - withOutputMode = Some(OutputMode.Append()))() - - val reader = createKafkaReader(topic) - .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") - .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") - .as[(Option[Int], Int)] - .map(_._2) - - try { - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - } - testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) - eventually(timeout(streamingTimeout)) { - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } - } finally { - writer.stop() - } - } - - test("streaming - topic field and topic option") { - /* The purpose of this test is to ensure that the topic option - * overrides the topic field. We begin by writing some data that - * includes a topic field and value (e.g., 'foo') along with a topic - * option. Then when we read from the topic specified in the option - * we should see the data i.e., the data was written to the topic - * option, and not to the topic in the data e.g., foo - */ - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = Some(topic), - withOutputMode = Some(OutputMode.Append()))( - withSelectExpr = "'foo' as topic", "CAST(value as STRING) value") - - val reader = createKafkaReader(topic) - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .selectExpr("CAST(key AS INT)", "CAST(value AS INT)") - .as[(Option[Int], Int)] - .map(_._2) - - try { - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - } - testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) - eventually(timeout(streamingTimeout)) { - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } - } finally { - writer.stop() - } - } - - test("null topic attribute") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - val topic = newTopic() - testUtils.createTopic(topic) - - runAndVerifyException[StreamingQueryException](inputTopic, "null topic present in the data.") { - createKafkaWriter(input.toDF())(withSelectExpr = "CAST(null as STRING) as topic", "value") - } - } - - test("streaming - write data with bad schema") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - val topic = newTopic() - testUtils.createTopic(topic) - - assertWrongSchema(topic, input, Seq("value as key", "value"), - "topic option required when no 'topic' attribute is present") - assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as key"), - "required attribute 'value' not found") - } - - test("streaming - write data with valid schema but wrong types") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - .selectExpr("CAST(value as STRING) value") - .toDF() - val topic = newTopic() - testUtils.createTopic(topic) - - assertWrongSchema(topic, input, Seq("CAST('1' as INT) as topic", "value"), - "topic must be a(n) string") - assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as INT) as value"), - "value must be a(n) string or binary") - assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as INT) as key", "value"), - "key must be a(n) string or binary") - assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as partition", "value"), - "partition must be a(n) int") - } - - test("streaming - write to non-existing topic") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - val topic = newTopic() - - runAndVerifyException[StreamingQueryException](inputTopic, "job aborted") { - createKafkaWriter(input.toDF(), withTopic = Some(topic))() - } - } - - test("streaming - exception on config serializer") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - testUtils.sendMessages(inputTopic, Array("0")) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .load() - - assertWrongOption(inputTopic, input.toDF(), Map("kafka.key.serializer" -> "foo"), - "kafka option 'key.serializer' is not supported") - assertWrongOption(inputTopic, input.toDF(), Map("kafka.value.serializer" -> "foo"), - "kafka option 'value.serializer' is not supported") - } - - test("generic - write big data with small producer buffer") { - /* This test ensures that we understand the semantics of Kafka when - * is comes to blocking on a call to send when the send buffer is full. - * This test will configure the smallest possible producer buffer and - * indicate that we should block when it is full. Thus, no exception should - * be thrown in the case of a full buffer. - */ - val topic = newTopic() - testUtils.createTopic(topic, 1) - val options = new java.util.HashMap[String, Object] - options.put("bootstrap.servers", testUtils.brokerAddress) - options.put("buffer.memory", "16384") // min buffer size - options.put("block.on.buffer.full", "true") - options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) - options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) - val inputSchema = Seq(AttributeReference("value", BinaryType)()) - val data = new Array[Byte](15000) // large value - val writeTask = new KafkaDataWriter(Some(topic), options, inputSchema) - try { - val fieldTypes: Array[DataType] = Array(BinaryType) - val converter = UnsafeProjection.create(fieldTypes) - val row = new SpecificInternalRow(fieldTypes) - row.update(0, data) - val iter = Seq.fill(1000)(converter.apply(row)).iterator - iter.foreach(writeTask.write(_)) - writeTask.commit() - } finally { - writeTask.close() - } - } - - private def createKafkaReader(topic: String): DataFrame = { - spark.read - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("startingOffsets", "earliest") - .option("endingOffsets", "latest") - .option("subscribe", topic) - .load() - } - - private def createKafkaWriter( - input: DataFrame, - withTopic: Option[String] = None, - withOutputMode: Option[OutputMode] = None, - withOptions: Map[String, String] = Map[String, String]()) - (withSelectExpr: String*): StreamingQuery = { - var stream: DataStreamWriter[Row] = null - val checkpointDir = Utils.createTempDir() - var df = input.toDF() - if (withSelectExpr.length > 0) { - df = df.selectExpr(withSelectExpr: _*) - } - stream = df.writeStream - .format("kafka") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - // We need to reduce blocking time to efficiently test non-existent partition behavior. - .option("kafka.max.block.ms", "1000") - .trigger(Trigger.Continuous(1000)) - .queryName("kafkaStream") - withTopic.foreach(stream.option("topic", _)) - withOutputMode.foreach(stream.outputMode(_)) - withOptions.foreach(opt => stream.option(opt._1, opt._2)) - stream.start() - } - - private def runAndVerifyException[T <: Exception : ClassTag]( - inputTopic: String, - expectErrorMsg: String)( - writerFn: => StreamingQuery): Unit = { - var writer: StreamingQuery = null - val ex: Exception = try { - intercept[T] { - writer = writerFn - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - } - throw writer.exception.get - } - } finally { - if (writer != null) writer.stop() - } - val rootException = ex match { - case e: StreamingQueryException => e.getCause.getCause - case e => e - } - assert(rootException.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg)) - } - - private def assertWrongSchema( - inputTopic: String, - input: DataFrame, - selectExpr: Seq[String], - expectErrorMsg: String): Unit = { - runAndVerifyException[AnalysisException](inputTopic, expectErrorMsg) { - createKafkaWriter(input)(withSelectExpr = selectExpr: _*) - } - } - - private def assertWrongOption( - inputTopic: String, - input: DataFrame, - options: Map[String, String], - expectErrorMsg: String): Unit = { - runAndVerifyException[IllegalArgumentException](inputTopic, expectErrorMsg) { - createKafkaWriter(input, withOptions = options)() - } - } -} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 1705d76..e2dcd62 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -18,23 +18,26 @@ package org.apache.spark.sql.kafka010 import java.nio.charset.StandardCharsets.UTF_8 -import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import scala.reflect.ClassTag + import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.internals.DefaultPartitioner import org.apache.kafka.common.Cluster import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.concurrent.TimeLimits.failAfter import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkContext, SparkException, TestUtils} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamBase} +import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType, StructField, StructType} abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession with KafkaTest { @@ -74,13 +77,17 @@ abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession with } } -class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { +abstract class KafkaSinkStreamingSuiteBase extends KafkaSinkSuiteBase { import testImplicits._ - override val streamingTimeout = 30.seconds + protected val streamingTimeout = 30.seconds + + protected def createMemoryStream(): MemoryStreamBase[String] + protected def verifyResult(writer: StreamingQuery)(verifyFn: => Unit): Unit + protected def defaultTrigger: Option[Trigger] test("streaming - write to kafka with topic field") { - val input = MemoryStream[String] + val input = createMemoryStream() val topic = newTopic() testUtils.createTopic(topic) @@ -96,55 +103,29 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { .as[(Option[Int], Int)] .map(_._2) - try { - input.addData("1", "2", "3", "4", "5") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - input.addData("6", "7", "8", "9", "10") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } finally { - writer.stop() - } + runAndVerifyValues(input, writer, reader) } - test("streaming - write aggregation w/o topic field, with topic option") { - val input = MemoryStream[String] + test("streaming - write w/o topic field, with topic option") { + val input = createMemoryStream() val topic = newTopic() testUtils.createTopic(topic) val writer = createKafkaWriter( - input.toDF().groupBy("value").count(), + input.toDF(), withTopic = Some(topic), - withOutputMode = Some(OutputMode.Update()))( - withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) value") + withOutputMode = Some(OutputMode.Append()))() val reader = createKafkaReader(topic) .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") - .as[(Int, Int)] + .as[(Option[Int], Int)] + .map(_._2) - try { - input.addData("1", "2", "2", "3", "3", "3") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3)) - input.addData("1", "2", "3") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4)) - } finally { - writer.stop() - } + runAndVerifyValues(input, writer, reader) } - test("streaming - aggregation with topic field and topic option") { + test("streaming - topic field and topic option") { /* The purpose of this test is to ensure that the topic option * overrides the topic field. We begin by writing some data that * includes a topic field and value (e.g., 'foo') along with a topic @@ -152,62 +133,27 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { * we should see the data i.e., the data was written to the topic * option, and not to the topic in the data e.g., foo */ - val input = MemoryStream[String] + val input = createMemoryStream() val topic = newTopic() testUtils.createTopic(topic) val writer = createKafkaWriter( - input.toDF().groupBy("value").count(), + input.toDF(), withTopic = Some(topic), - withOutputMode = Some(OutputMode.Update()))( - withSelectExpr = "'foo' as topic", - "CAST(value as STRING) key", "CAST(count as STRING) value") + withOutputMode = Some(OutputMode.Append()))( + withSelectExpr = "'foo' as topic", "value") val reader = createKafkaReader(topic) .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .selectExpr("CAST(key AS INT)", "CAST(value AS INT)") - .as[(Int, Int)] - - try { - input.addData("1", "2", "2", "3", "3", "3") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3)) - input.addData("1", "2", "3") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4)) - } finally { - writer.stop() - } - } - - test("streaming - sink progress is produced") { - /* ensure sink progress is correctly produced. */ - val input = MemoryStream[String] - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = Some(topic), - withOutputMode = Some(OutputMode.Update()))() + .as[(Option[Int], Int)] + .map(_._2) - try { - input.addData("1", "2", "3") - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - assert(writer.lastProgress.sink.numOutputRows == 3L) - } finally { - writer.stop() - } + runAndVerifyValues(input, writer, reader) } test("streaming - write data with bad schema") { - val input = MemoryStream[String] + val input = createMemoryStream() val topic = newTopic() testUtils.createTopic(topic) @@ -218,7 +164,7 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { } test("streaming - write data with valid schema but wrong types") { - val input = MemoryStream[String] + val input = createMemoryStream() val topic = newTopic() testUtils.createTopic(topic) @@ -233,15 +179,15 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { } test("streaming - write to non-existing topic") { - val input = MemoryStream[String] + val input = createMemoryStream() - runAndVerifyStreamingQueryException(input, "job aborted") { + runAndVerifyException[StreamingQueryException](input, "job aborted") { createKafkaWriter(input.toDF(), withTopic = Some(newTopic()))() } } test("streaming - exception on config serializer") { - val input = MemoryStream[String] + val input = createMemoryStream() assertWrongOption(input, Map("kafka.key.serializer" -> "foo"), "kafka option 'key.serializer' is not supported") @@ -249,7 +195,7 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { "kafka option 'value.serializer' is not supported") } - private def createKafkaWriter( + protected def createKafkaWriter( input: DataFrame, withTopic: Option[String] = None, withOutputMode: Option[OutputMode] = None, @@ -270,46 +216,165 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { withTopic.foreach(stream.option("topic", _)) withOutputMode.foreach(stream.outputMode(_)) withOptions.foreach(opt => stream.option(opt._1, opt._2)) + defaultTrigger.foreach(stream.trigger(_)) } stream.start() } - private def runAndVerifyStreamingQueryException( - input: MemoryStream[String], + private def runAndVerifyValues( + input: MemoryStreamBase[String], + writer: StreamingQuery, + reader: Dataset[Int]): Unit = { + try { + input.addData("1", "2", "3", "4", "5") + verifyResult(writer)(checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)) + input.addData("6", "7", "8", "9", "10") + verifyResult(writer)(checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10)) + } finally { + writer.stop() + } + } + + private def runAndVerifyException[T <: Exception : ClassTag]( + input: MemoryStreamBase[String], expectErrorMsg: String)( writerFn: => StreamingQuery): Unit = { var writer: StreamingQuery = null val ex: Exception = try { - intercept[StreamingQueryException] { + intercept[T] { writer = writerFn input.addData("1", "2", "3", "4", "5") - writer.processAllAvailable() + input match { + case _: MemoryStream[String] => writer.processAllAvailable() + case _: ContinuousMemoryStream[String] => + eventually(timeout(streamingTimeout)) { + assert(writer.exception.isDefined) + } + + throw writer.exception.get + } } } finally { if (writer != null) writer.stop() } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg)) + TestUtils.assertExceptionMsg(ex, expectErrorMsg, ignoreCase = true) } private def assertWrongSchema( - input: MemoryStream[String], + input: MemoryStreamBase[String], selectExpr: Seq[String], expectErrorMsg: String): Unit = { - runAndVerifyStreamingQueryException(input, expectErrorMsg) { - createKafkaWriter(input.toDF())(withSelectExpr = selectExpr: _*) + // just pick common exception of both micro-batch and continuous cases + runAndVerifyException[Exception](input, expectErrorMsg) { + createKafkaWriter(input.toDF())( + withSelectExpr = selectExpr: _*) } } private def assertWrongOption( - input: MemoryStream[String], + input: MemoryStreamBase[String], options: Map[String, String], expectErrorMsg: String): Unit = { - runAndVerifyStreamingQueryException(input, expectErrorMsg) { + // just pick common exception of both micro-batch and continuous cases + runAndVerifyException[Exception](input, expectErrorMsg) { createKafkaWriter(input.toDF(), withOptions = options)() } } } +class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase { + import testImplicits._ + + override val streamingTimeout = 30.seconds + + override protected def createMemoryStream(): MemoryStreamBase[String] = MemoryStream[String] + + override protected def verifyResult(writer: StreamingQuery)(verifyFn: => Unit): Unit = { + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + verifyFn + } + + override protected def defaultTrigger: Option[Trigger] = None + + test("streaming - sink progress is produced") { + /* ensure sink progress is correctly produced. */ + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + val writer = createKafkaWriter( + input.toDF(), + withTopic = Some(topic), + withOutputMode = Some(OutputMode.Update()))() + + try { + input.addData("1", "2", "3") + verifyResult(writer) { + assert(writer.lastProgress.sink.numOutputRows == 3L) + } + } finally { + writer.stop() + } + } +} + +class KafkaContinuousSinkSuite extends KafkaSinkStreamingSuiteBase { + import testImplicits._ + + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( + new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + override protected def createMemoryStream(): MemoryStreamBase[String] = { + ContinuousMemoryStream.singlePartition[String] + } + + override protected def verifyResult(writer: StreamingQuery)(verifyFn: => Unit): Unit = { + eventually(timeout(streamingTimeout), interval(5.seconds)) { + verifyFn + } + } + + override protected def defaultTrigger: Option[Trigger] = Some(Trigger.Continuous(1000)) + + test("generic - write big data with small producer buffer") { + /* This test ensures that we understand the semantics of Kafka when + * is comes to blocking on a call to send when the send buffer is full. + * This test will configure the smallest possible producer buffer and + * indicate that we should block when it is full. Thus, no exception should + * be thrown in the case of a full buffer. + */ + val topic = newTopic() + testUtils.createTopic(topic, 1) + val options = new java.util.HashMap[String, Object] + options.put("bootstrap.servers", testUtils.brokerAddress) + options.put("buffer.memory", "16384") // min buffer size + options.put("block.on.buffer.full", "true") + options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) + options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) + val inputSchema = Seq(AttributeReference("value", BinaryType)()) + val data = new Array[Byte](15000) // large value + val writeTask = new KafkaDataWriter(Some(topic), options, inputSchema) + try { + val fieldTypes: Array[DataType] = Array(BinaryType) + val converter = UnsafeProjection.create(fieldTypes) + val row = new SpecificInternalRow(fieldTypes) + row.update(0, data) + val iter = Seq.fill(1000)(converter.apply(row)).iterator + iter.foreach(writeTask.write(_)) + writeTask.commit() + } finally { + writeTask.close() + } + } +} + abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { import testImplicits._ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org