This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 252ecd3  [SPARK-29635][SS] Extract base test suites between Kafka 
micro-batch sink and Kafka continuous sink
252ecd3 is described below

commit 252ecd333ff7fa65c50e72fec25e7f5ee66bc9e7
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Nov 6 17:08:42 2019 -0800

    [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink 
and Kafka continuous sink
    
    ### What changes were proposed in this pull request?
    
    This patch leverages V2 continuous memory stream to extract tests from 
Kafka micro-batch sink suite and continuous sink suite and deduplicate them. 
These tests are basically doing the same, except how to run and verify the 
result.
    
    ### Why are the changes needed?
    
    We no longer have same tests spotted on two places - brings 300 lines 
deletion.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
    
    Closes #26292 from HeartSaVioR/SPARK-29635.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../main/scala/org/apache/spark/TestUtils.scala    |  16 +-
 .../sql/kafka010/KafkaContinuousSinkSuite.scala    | 395 ---------------------
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 265 ++++++++------
 3 files changed, 177 insertions(+), 499 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 5d79394..1f06364 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -24,7 +24,7 @@ import java.nio.file.{Files => JavaFiles}
 import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, 
OWNER_WRITE}
 import java.security.SecureRandom
 import java.security.cert.X509Certificate
-import java.util.{Arrays, EnumSet, Properties}
+import java.util.{Arrays, EnumSet, Locale, Properties}
 import java.util.concurrent.{TimeoutException, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream, Manifest}
 import javax.net.ssl._
@@ -214,12 +214,20 @@ private[spark] object TestUtils {
    * Asserts that exception message contains the message. Please note this 
checks all
    * exceptions in the tree.
    */
-  def assertExceptionMsg(exception: Throwable, msg: String): Unit = {
+  def assertExceptionMsg(exception: Throwable, msg: String, ignoreCase: 
Boolean = false): Unit = {
+    def contain(msg1: String, msg2: String): Boolean = {
+      if (ignoreCase) {
+        msg1.toLowerCase(Locale.ROOT).contains(msg2.toLowerCase(Locale.ROOT))
+      } else {
+        msg1.contains(msg2)
+      }
+    }
+
     var e = exception
-    var contains = e.getMessage.contains(msg)
+    var contains = contain(e.getMessage, msg)
     while (e.getCause != null && !contains) {
       e = e.getCause
-      contains = e.getMessage.contains(msg)
+      contains = contain(e.getMessage, msg)
     }
     assert(contains, s"Exception tree doesn't contain the expected message: 
$msg")
   }
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
deleted file mode 100644
index 031f609..0000000
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.util.Locale
-
-import scala.reflect.ClassTag
-
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
-import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.streaming._
-import org.apache.spark.sql.types.{BinaryType, DataType}
-import org.apache.spark.util.Utils
-
-/**
- * This is a temporary port of KafkaSinkSuite, since we do not yet have a V2 
memory stream.
- * Once we have one, this will be changed to a specialization of 
KafkaSinkSuite and we won't have
- * to duplicate all the code.
- */
-class KafkaContinuousSinkSuite extends KafkaContinuousTest {
-  import testImplicits._
-
-  override val streamingTimeout = 30.seconds
-
-  override val brokerProps = Map("auto.create.topics.enable" -> "false")
-
-  override def afterAll(): Unit = {
-    if (testUtils != null) {
-      testUtils.teardown()
-      testUtils = null
-    }
-    super.afterAll()
-  }
-
-  test("streaming - write to kafka with topic field") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    val writer = createKafkaWriter(
-      input.toDF(),
-      withTopic = None,
-      withOutputMode = Some(OutputMode.Append))(
-      withSelectExpr = s"'$topic' as topic", "value")
-
-    val reader = createKafkaReader(topic)
-      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
-      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-      .as[(Option[Int], Int)]
-      .map(_._2)
-
-    try {
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
-      }
-      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
-      eventually(timeout(streamingTimeout)) {
-        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-      }
-    } finally {
-      writer.stop()
-    }
-  }
-
-  test("streaming - write w/o topic field, with topic option") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    val writer = createKafkaWriter(
-      input.toDF(),
-      withTopic = Some(topic),
-      withOutputMode = Some(OutputMode.Append()))()
-
-    val reader = createKafkaReader(topic)
-      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
-      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-      .as[(Option[Int], Int)]
-      .map(_._2)
-
-    try {
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
-      }
-      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
-      eventually(timeout(streamingTimeout)) {
-        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-      }
-    } finally {
-      writer.stop()
-    }
-  }
-
-  test("streaming - topic field and topic option") {
-    /* The purpose of this test is to ensure that the topic option
-     * overrides the topic field. We begin by writing some data that
-     * includes a topic field and value (e.g., 'foo') along with a topic
-     * option. Then when we read from the topic specified in the option
-     * we should see the data i.e., the data was written to the topic
-     * option, and not to the topic in the data e.g., foo
-     */
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    val writer = createKafkaWriter(
-      input.toDF(),
-      withTopic = Some(topic),
-      withOutputMode = Some(OutputMode.Append()))(
-      withSelectExpr = "'foo' as topic", "CAST(value as STRING) value")
-
-    val reader = createKafkaReader(topic)
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
-      .as[(Option[Int], Int)]
-      .map(_._2)
-
-    try {
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
-      }
-      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
-      eventually(timeout(streamingTimeout)) {
-        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-      }
-    } finally {
-      writer.stop()
-    }
-  }
-
-  test("null topic attribute") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    runAndVerifyException[StreamingQueryException](inputTopic, "null topic 
present in the data.") {
-      createKafkaWriter(input.toDF())(withSelectExpr = "CAST(null as STRING) 
as topic", "value")
-    }
-  }
-
-  test("streaming - write data with bad schema") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    assertWrongSchema(topic, input, Seq("value as key", "value"),
-      "topic option required when no 'topic' attribute is present")
-    assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as key"),
-      "required attribute 'value' not found")
-  }
-
-  test("streaming - write data with valid schema but wrong types") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-      .selectExpr("CAST(value as STRING) value")
-      .toDF()
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    assertWrongSchema(topic, input, Seq("CAST('1' as INT) as topic", "value"),
-      "topic must be a(n) string")
-    assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as 
INT) as value"),
-      "value must be a(n) string or binary")
-    assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as 
INT) as key", "value"),
-      "key must be a(n) string or binary")
-    assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as 
partition", "value"),
-      "partition must be a(n) int")
-  }
-
-  test("streaming - write to non-existing topic") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .option("startingOffsets", "earliest")
-      .load()
-    val topic = newTopic()
-
-    runAndVerifyException[StreamingQueryException](inputTopic, "job aborted") {
-      createKafkaWriter(input.toDF(), withTopic = Some(topic))()
-    }
-  }
-
-  test("streaming - exception on config serializer") {
-    val inputTopic = newTopic()
-    testUtils.createTopic(inputTopic, partitions = 1)
-    testUtils.sendMessages(inputTopic, Array("0"))
-
-    val input = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", inputTopic)
-      .load()
-
-    assertWrongOption(inputTopic, input.toDF(), Map("kafka.key.serializer" -> 
"foo"),
-      "kafka option 'key.serializer' is not supported")
-    assertWrongOption(inputTopic, input.toDF(), Map("kafka.value.serializer" 
-> "foo"),
-      "kafka option 'value.serializer' is not supported")
-  }
-
-  test("generic - write big data with small producer buffer") {
-    /* This test ensures that we understand the semantics of Kafka when
-    * is comes to blocking on a call to send when the send buffer is full.
-    * This test will configure the smallest possible producer buffer and
-    * indicate that we should block when it is full. Thus, no exception should
-    * be thrown in the case of a full buffer.
-    */
-    val topic = newTopic()
-    testUtils.createTopic(topic, 1)
-    val options = new java.util.HashMap[String, Object]
-    options.put("bootstrap.servers", testUtils.brokerAddress)
-    options.put("buffer.memory", "16384") // min buffer size
-    options.put("block.on.buffer.full", "true")
-    options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
-    options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
-    val inputSchema = Seq(AttributeReference("value", BinaryType)())
-    val data = new Array[Byte](15000) // large value
-    val writeTask = new KafkaDataWriter(Some(topic), options, inputSchema)
-    try {
-      val fieldTypes: Array[DataType] = Array(BinaryType)
-      val converter = UnsafeProjection.create(fieldTypes)
-      val row = new SpecificInternalRow(fieldTypes)
-      row.update(0, data)
-      val iter = Seq.fill(1000)(converter.apply(row)).iterator
-      iter.foreach(writeTask.write(_))
-      writeTask.commit()
-    } finally {
-      writeTask.close()
-    }
-  }
-
-  private def createKafkaReader(topic: String): DataFrame = {
-    spark.read
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("startingOffsets", "earliest")
-      .option("endingOffsets", "latest")
-      .option("subscribe", topic)
-      .load()
-  }
-
-  private def createKafkaWriter(
-      input: DataFrame,
-      withTopic: Option[String] = None,
-      withOutputMode: Option[OutputMode] = None,
-      withOptions: Map[String, String] = Map[String, String]())
-      (withSelectExpr: String*): StreamingQuery = {
-    var stream: DataStreamWriter[Row] = null
-    val checkpointDir = Utils.createTempDir()
-    var df = input.toDF()
-    if (withSelectExpr.length > 0) {
-      df = df.selectExpr(withSelectExpr: _*)
-    }
-    stream = df.writeStream
-      .format("kafka")
-      .option("checkpointLocation", checkpointDir.getCanonicalPath)
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      // We need to reduce blocking time to efficiently test non-existent 
partition behavior.
-      .option("kafka.max.block.ms", "1000")
-      .trigger(Trigger.Continuous(1000))
-      .queryName("kafkaStream")
-    withTopic.foreach(stream.option("topic", _))
-    withOutputMode.foreach(stream.outputMode(_))
-    withOptions.foreach(opt => stream.option(opt._1, opt._2))
-    stream.start()
-  }
-
-  private def runAndVerifyException[T <: Exception : ClassTag](
-      inputTopic: String,
-      expectErrorMsg: String)(
-      writerFn: => StreamingQuery): Unit = {
-    var writer: StreamingQuery = null
-    val ex: Exception = try {
-      intercept[T] {
-        writer = writerFn
-        testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-        eventually(timeout(streamingTimeout)) {
-          assert(writer.exception.isDefined)
-        }
-        throw writer.exception.get
-      }
-    } finally {
-      if (writer != null) writer.stop()
-    }
-    val rootException = ex match {
-      case e: StreamingQueryException => e.getCause.getCause
-      case e => e
-    }
-    
assert(rootException.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg))
-  }
-
-  private def assertWrongSchema(
-      inputTopic: String,
-      input: DataFrame,
-      selectExpr: Seq[String],
-      expectErrorMsg: String): Unit = {
-    runAndVerifyException[AnalysisException](inputTopic, expectErrorMsg) {
-      createKafkaWriter(input)(withSelectExpr = selectExpr: _*)
-    }
-  }
-
-  private def assertWrongOption(
-      inputTopic: String,
-      input: DataFrame,
-      options: Map[String, String],
-      expectErrorMsg: String): Unit = {
-    runAndVerifyException[IllegalArgumentException](inputTopic, 
expectErrorMsg) {
-      createKafkaWriter(input, withOptions = options)()
-    }
-  }
-}
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 1705d76..e2dcd62 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -18,23 +18,26 @@
 package org.apache.spark.sql.kafka010
 
 import java.nio.charset.StandardCharsets.UTF_8
-import java.util.Locale
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.reflect.ClassTag
+
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner
 import org.apache.kafka.common.Cluster
 import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.concurrent.TimeLimits.failAfter
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkConf, SparkException, TestUtils}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, TestUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
-import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
MemoryStreamBase}
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
-import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
 import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, 
StringType, StructField, StructType}
 
 abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession 
with KafkaTest {
@@ -74,13 +77,17 @@ abstract class KafkaSinkSuiteBase extends QueryTest with 
SharedSparkSession with
   }
 }
 
-class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest {
+abstract class KafkaSinkStreamingSuiteBase extends KafkaSinkSuiteBase {
   import testImplicits._
 
-  override val streamingTimeout = 30.seconds
+  protected val streamingTimeout = 30.seconds
+
+  protected def createMemoryStream(): MemoryStreamBase[String]
+  protected def verifyResult(writer: StreamingQuery)(verifyFn: => Unit): Unit
+  protected def defaultTrigger: Option[Trigger]
 
   test("streaming - write to kafka with topic field") {
-    val input = MemoryStream[String]
+    val input = createMemoryStream()
     val topic = newTopic()
     testUtils.createTopic(topic)
 
@@ -96,55 +103,29 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase 
with StreamTest {
       .as[(Option[Int], Int)]
       .map(_._2)
 
-    try {
-      input.addData("1", "2", "3", "4", "5")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
-      input.addData("6", "7", "8", "9", "10")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-    } finally {
-      writer.stop()
-    }
+    runAndVerifyValues(input, writer, reader)
   }
 
-  test("streaming - write aggregation w/o topic field, with topic option") {
-    val input = MemoryStream[String]
+  test("streaming - write w/o topic field, with topic option") {
+    val input = createMemoryStream()
     val topic = newTopic()
     testUtils.createTopic(topic)
 
     val writer = createKafkaWriter(
-      input.toDF().groupBy("value").count(),
+      input.toDF(),
       withTopic = Some(topic),
-      withOutputMode = Some(OutputMode.Update()))(
-      withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) 
value")
+      withOutputMode = Some(OutputMode.Append()))()
 
     val reader = createKafkaReader(topic)
       .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
       .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-      .as[(Int, Int)]
+      .as[(Option[Int], Int)]
+      .map(_._2)
 
-    try {
-      input.addData("1", "2", "2", "3", "3", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
-      input.addData("1", "2", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), 
(3, 4))
-    } finally {
-      writer.stop()
-    }
+    runAndVerifyValues(input, writer, reader)
   }
 
-  test("streaming - aggregation with topic field and topic option") {
+  test("streaming - topic field and topic option") {
     /* The purpose of this test is to ensure that the topic option
      * overrides the topic field. We begin by writing some data that
      * includes a topic field and value (e.g., 'foo') along with a topic
@@ -152,62 +133,27 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase 
with StreamTest {
      * we should see the data i.e., the data was written to the topic
      * option, and not to the topic in the data e.g., foo
      */
-    val input = MemoryStream[String]
+    val input = createMemoryStream()
     val topic = newTopic()
     testUtils.createTopic(topic)
 
     val writer = createKafkaWriter(
-      input.toDF().groupBy("value").count(),
+      input.toDF(),
       withTopic = Some(topic),
-      withOutputMode = Some(OutputMode.Update()))(
-      withSelectExpr = "'foo' as topic",
-        "CAST(value as STRING) key", "CAST(count as STRING) value")
+      withOutputMode = Some(OutputMode.Append()))(
+      withSelectExpr = "'foo' as topic", "value")
 
     val reader = createKafkaReader(topic)
       .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
-      .as[(Int, Int)]
-
-    try {
-      input.addData("1", "2", "2", "3", "3", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
-      input.addData("1", "2", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), 
(3, 4))
-    } finally {
-      writer.stop()
-    }
-  }
-
-  test("streaming - sink progress is produced") {
-    /* ensure sink progress is correctly produced. */
-    val input = MemoryStream[String]
-    val topic = newTopic()
-    testUtils.createTopic(topic)
-
-    val writer = createKafkaWriter(
-      input.toDF(),
-      withTopic = Some(topic),
-      withOutputMode = Some(OutputMode.Update()))()
+      .as[(Option[Int], Int)]
+      .map(_._2)
 
-    try {
-      input.addData("1", "2", "3")
-      failAfter(streamingTimeout) {
-        writer.processAllAvailable()
-      }
-      assert(writer.lastProgress.sink.numOutputRows == 3L)
-    } finally {
-      writer.stop()
-    }
+    runAndVerifyValues(input, writer, reader)
   }
 
   test("streaming - write data with bad schema") {
-    val input = MemoryStream[String]
+    val input = createMemoryStream()
     val topic = newTopic()
     testUtils.createTopic(topic)
 
@@ -218,7 +164,7 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase 
with StreamTest {
   }
 
   test("streaming - write data with valid schema but wrong types") {
-    val input = MemoryStream[String]
+    val input = createMemoryStream()
     val topic = newTopic()
     testUtils.createTopic(topic)
 
@@ -233,15 +179,15 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase 
with StreamTest {
   }
 
   test("streaming - write to non-existing topic") {
-    val input = MemoryStream[String]
+    val input = createMemoryStream()
 
-    runAndVerifyStreamingQueryException(input, "job aborted") {
+    runAndVerifyException[StreamingQueryException](input, "job aborted") {
       createKafkaWriter(input.toDF(), withTopic = Some(newTopic()))()
     }
   }
 
   test("streaming - exception on config serializer") {
-    val input = MemoryStream[String]
+    val input = createMemoryStream()
 
     assertWrongOption(input, Map("kafka.key.serializer" -> "foo"),
       "kafka option 'key.serializer' is not supported")
@@ -249,7 +195,7 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase 
with StreamTest {
       "kafka option 'value.serializer' is not supported")
   }
 
-  private def createKafkaWriter(
+  protected def createKafkaWriter(
       input: DataFrame,
       withTopic: Option[String] = None,
       withOutputMode: Option[OutputMode] = None,
@@ -270,46 +216,165 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase 
with StreamTest {
       withTopic.foreach(stream.option("topic", _))
       withOutputMode.foreach(stream.outputMode(_))
       withOptions.foreach(opt => stream.option(opt._1, opt._2))
+      defaultTrigger.foreach(stream.trigger(_))
     }
     stream.start()
   }
 
-  private def runAndVerifyStreamingQueryException(
-      input: MemoryStream[String],
+  private def runAndVerifyValues(
+      input: MemoryStreamBase[String],
+      writer: StreamingQuery,
+      reader: Dataset[Int]): Unit = {
+    try {
+      input.addData("1", "2", "3", "4", "5")
+      verifyResult(writer)(checkDatasetUnorderly(reader, 1, 2, 3, 4, 5))
+      input.addData("6", "7", "8", "9", "10")
+      verifyResult(writer)(checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6,
+        7, 8, 9, 10))
+    } finally {
+      writer.stop()
+    }
+  }
+
+  private def runAndVerifyException[T <: Exception : ClassTag](
+      input: MemoryStreamBase[String],
       expectErrorMsg: String)(
       writerFn: => StreamingQuery): Unit = {
     var writer: StreamingQuery = null
     val ex: Exception = try {
-      intercept[StreamingQueryException] {
+      intercept[T] {
         writer = writerFn
         input.addData("1", "2", "3", "4", "5")
-        writer.processAllAvailable()
+        input match {
+          case _: MemoryStream[String] => writer.processAllAvailable()
+          case _: ContinuousMemoryStream[String] =>
+            eventually(timeout(streamingTimeout)) {
+              assert(writer.exception.isDefined)
+            }
+
+            throw writer.exception.get
+        }
       }
     } finally {
       if (writer != null) writer.stop()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg))
+    TestUtils.assertExceptionMsg(ex, expectErrorMsg, ignoreCase = true)
   }
 
   private def assertWrongSchema(
-      input: MemoryStream[String],
+      input: MemoryStreamBase[String],
       selectExpr: Seq[String],
       expectErrorMsg: String): Unit = {
-    runAndVerifyStreamingQueryException(input, expectErrorMsg) {
-      createKafkaWriter(input.toDF())(withSelectExpr = selectExpr: _*)
+    // just pick common exception of both micro-batch and continuous cases
+    runAndVerifyException[Exception](input, expectErrorMsg) {
+      createKafkaWriter(input.toDF())(
+        withSelectExpr = selectExpr: _*)
     }
   }
 
   private def assertWrongOption(
-      input: MemoryStream[String],
+      input: MemoryStreamBase[String],
       options: Map[String, String],
       expectErrorMsg: String): Unit = {
-    runAndVerifyStreamingQueryException(input, expectErrorMsg) {
+    // just pick common exception of both micro-batch and continuous cases
+    runAndVerifyException[Exception](input, expectErrorMsg) {
       createKafkaWriter(input.toDF(), withOptions = options)()
     }
   }
 }
 
+class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase {
+  import testImplicits._
+
+  override val streamingTimeout = 30.seconds
+
+  override protected def createMemoryStream(): MemoryStreamBase[String] = 
MemoryStream[String]
+
+  override protected def verifyResult(writer: StreamingQuery)(verifyFn: => 
Unit): Unit = {
+    failAfter(streamingTimeout) {
+      writer.processAllAvailable()
+    }
+    verifyFn
+  }
+
+  override protected def defaultTrigger: Option[Trigger] = None
+
+  test("streaming - sink progress is produced") {
+    /* ensure sink progress is correctly produced. */
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Update()))()
+
+    try {
+      input.addData("1", "2", "3")
+      verifyResult(writer) {
+        assert(writer.lastProgress.sink.numOutputRows == 3L)
+      }
+    } finally {
+      writer.stop()
+    }
+  }
+}
+
+class KafkaContinuousSinkSuite extends KafkaSinkStreamingSuiteBase {
+  import testImplicits._
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+    new SparkContext(
+      "local[10]",
+      "continuous-stream-test-sql-context",
+      sparkConf.set("spark.sql.testkey", "true")))
+
+  override protected def createMemoryStream(): MemoryStreamBase[String] = {
+    ContinuousMemoryStream.singlePartition[String]
+  }
+
+  override protected def verifyResult(writer: StreamingQuery)(verifyFn: => 
Unit): Unit = {
+    eventually(timeout(streamingTimeout), interval(5.seconds)) {
+      verifyFn
+    }
+  }
+
+  override protected def defaultTrigger: Option[Trigger] = 
Some(Trigger.Continuous(1000))
+
+  test("generic - write big data with small producer buffer") {
+    /* This test ensures that we understand the semantics of Kafka when
+    * is comes to blocking on a call to send when the send buffer is full.
+    * This test will configure the smallest possible producer buffer and
+    * indicate that we should block when it is full. Thus, no exception should
+    * be thrown in the case of a full buffer.
+    */
+    val topic = newTopic()
+    testUtils.createTopic(topic, 1)
+    val options = new java.util.HashMap[String, Object]
+    options.put("bootstrap.servers", testUtils.brokerAddress)
+    options.put("buffer.memory", "16384") // min buffer size
+    options.put("block.on.buffer.full", "true")
+    options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
+    options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
+    val inputSchema = Seq(AttributeReference("value", BinaryType)())
+    val data = new Array[Byte](15000) // large value
+    val writeTask = new KafkaDataWriter(Some(topic), options, inputSchema)
+    try {
+      val fieldTypes: Array[DataType] = Array(BinaryType)
+      val converter = UnsafeProjection.create(fieldTypes)
+      val row = new SpecificInternalRow(fieldTypes)
+      row.update(0, data)
+      val iter = Seq.fill(1000)(converter.apply(row)).iterator
+      iter.foreach(writeTask.write(_))
+      writeTask.commit()
+    } finally {
+      writeTask.close()
+    }
+  }
+}
+
 abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
   import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to