This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a1391cf  [SPARK-31706][SQL] add back the support of streaming update 
mode
a1391cf is described below

commit a1391cf56b3d3c98960e126fb1cf7811a967d74c
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed May 20 03:45:13 2020 +0000

    [SPARK-31706][SQL] add back the support of streaming update mode
    
    ### What changes were proposed in this pull request?
    
    This PR adds a private `WriteBuilder` mixin trait: 
`SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still 
support the update mode.
    
    Note: it's private because we don't have a proper design yet. I didn't take 
the proposal in 
https://github.com/apache/spark/pull/23702#discussion_r258593059 because we may 
want something more general, like updating by an expression `key1 = key2 + 10`.
    
    ### Why are the changes needed?
    
    In Spark 2.4, all builtin v2 streaming sinks support all streaming output 
modes, and v2 sinks are enabled by default, see 
https://issues.apache.org/jira/browse/SPARK-22911
    
    It's too risky for 3.0 to go back to v1 sinks, so I propose to add a 
private trait to fix builtin v2 sinks, to keep backward compatibility.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now all the builtin v2 streaming sinks support all streaming output 
modes, which is the same as 2.4
    
    ### How was this patch tested?
    
    existing tests.
    
    Closes #28523 from cloud-fan/update.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 34414acfa36d1a91cd6a64761625c8b0bd90c0a7)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  9 +++++---
 .../connector/SupportsStreamingUpdate.scala        | 26 ++++++++++++++++++++++
 .../datasources/noop/NoopDataSource.scala          |  6 +++--
 .../sql/execution/streaming/StreamExecution.scala  | 12 ++++------
 .../spark/sql/execution/streaming/console.scala    |  7 +++---
 .../streaming/sources/ForeachWriterTable.scala     |  8 ++++---
 .../sql/execution/streaming/sources/memory.scala   |  6 ++++-
 7 files changed, 54 insertions(+), 20 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index a5e5d01..ede58bd 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability, TableProvider}
 import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, 
WriteBuilder}
+import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, 
SupportsTruncate, WriteBuilder}
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.internal.connector.{SimpleTableProvider, 
SupportsStreamingUpdate}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -394,7 +394,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       () => new KafkaScan(options)
 
     override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-      new WriteBuilder {
+      new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
         private val options = info.options
         private val inputSchema: StructType = info.schema()
         private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
@@ -410,6 +410,9 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
           assert(inputSchema != null)
           new KafkaStreamingWrite(topic, producerParams, inputSchema)
         }
+
+        override def truncate(): WriteBuilder = this
+        override def update(): WriteBuilder = this
       }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala
new file mode 100644
index 0000000..32be74a
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.write.WriteBuilder
+
+// An internal `WriteBuilder` mixin to support UPDATE streaming output mode.
+// TODO: design an official API for streaming output mode UPDATE.
+trait SupportsStreamingUpdate extends WriteBuilder {
+  def update(): WriteBuilder
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 851cc51..8a6c4dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, 
TableCapability}
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, 
WriteBuilder, WriterCommitMessage}
 import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.internal.connector.{SimpleTableProvider, 
SupportsStreamingUpdate}
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -52,8 +52,10 @@ private[noop] object NoopTable extends Table with 
SupportsWrite {
   }
 }
 
-private[noop] object NoopWriteBuilder extends WriteBuilder with 
SupportsTruncate {
+private[noop] object NoopWriteBuilder extends WriteBuilder
+  with SupportsTruncate with SupportsStreamingUpdate {
   override def truncate(): WriteBuilder = this
+  override def update(): WriteBuilder = this
   override def buildForBatch(): BatchWrite = NoopBatchWrite
   override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9b1951a..18fe38c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -629,14 +630,9 @@ abstract class StreamExecution(
         
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
 
       case Update =>
-        // Although no v2 sinks really support Update mode now, but during 
tests we do want them
-        // to pretend to support Update mode, and treat Update mode same as 
Append mode.
-        if (Utils.isTesting) {
-          writeBuilder.buildForStreaming()
-        } else {
-          throw new IllegalArgumentException(
-            "Data source v2 streaming sinks does not support Update mode.")
-        }
+        require(writeBuilder.isInstanceOf[SupportsStreamingUpdate],
+          table.name + " does not support Update mode.")
+        
writeBuilder.asInstanceOf[SupportsStreamingUpdate].update().buildForStreaming()
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index e471e6c..1e64021 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, 
Table, TableCapabi
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
SupportsTruncate, WriteBuilder}
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.internal.connector.{SimpleTableProvider, 
SupportsStreamingUpdate}
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -73,11 +73,12 @@ object ConsoleTable extends Table with SupportsWrite {
   }
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    new WriteBuilder with SupportsTruncate {
+    new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
       private val inputSchema: StructType = info.schema()
 
-      // Do nothing for truncate. Console sink is special that it just prints 
all the records.
+      // Do nothing for truncate/update. Console sink is special and it just 
prints all the records.
       override def truncate(): WriteBuilder = this
+      override def update(): WriteBuilder = this
 
       override def buildForStreaming(): StreamingWrite = {
         assert(inputSchema != null)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index ba54c85..57a73c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, 
Table, TableCapabi
 import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo, 
PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
 import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
 import org.apache.spark.sql.execution.python.PythonForeachWriter
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -54,12 +55,13 @@ case class ForeachWriterTable[T](
   }
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    new WriteBuilder with SupportsTruncate {
+    new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
       private var inputSchema: StructType = info.schema()
 
-      // Do nothing for truncate. Foreach sink is special that it just 
forwards all the records to
-      // ForeachWriter.
+      // Do nothing for truncate/update. Foreach sink is special and it just 
forwards all the
+      // records to ForeachWriter.
       override def truncate(): WriteBuilder = this
+      override def update(): WriteBuilder = this
 
       override def buildForStreaming(): StreamingWrite = {
         new StreamingWrite {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
index deab42b..03ebbb9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, 
Table, TableCapabi
 import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, 
LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, 
WriterCommitMessage}
 import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
 import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -53,7 +54,7 @@ class MemorySink extends Table with SupportsWrite with 
Logging {
   }
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    new WriteBuilder with SupportsTruncate {
+    new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
       private var needTruncate: Boolean = false
       private val inputSchema: StructType = info.schema()
 
@@ -62,6 +63,9 @@ class MemorySink extends Table with SupportsWrite with 
Logging {
         this
       }
 
+      // The in-memory sink treats update as append.
+      override def update(): WriteBuilder = this
+
       override def buildForStreaming(): StreamingWrite = {
         new MemoryStreamingWrite(MemorySink.this, inputSchema, needTruncate)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to