This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a1391cf [SPARK-31706][SQL] add back the support of streaming update
mode
a1391cf is described below
commit a1391cf56b3d3c98960e126fb1cf7811a967d74c
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed May 20 03:45:13 2020 +0000
[SPARK-31706][SQL] add back the support of streaming update mode
### What changes were proposed in this pull request?
This PR adds a private `WriteBuilder` mixin trait:
`SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still
support the update mode.
Note: it's private because we don't have a proper design yet. I didn't take
the proposal in
https://github.com/apache/spark/pull/23702#discussion_r258593059 because we may
want something more general, like updating by an expression `key1 = key2 + 10`.
### Why are the changes needed?
In Spark 2.4, all builtin v2 streaming sinks support all streaming output
modes, and v2 sinks are enabled by default, see
https://issues.apache.org/jira/browse/SPARK-22911
It's too risky for 3.0 to go back to v1 sinks, so I propose to add a
private trait to fix builtin v2 sinks, to keep backward compatibility.
### Does this PR introduce _any_ user-facing change?
Yes, now all the builtin v2 streaming sinks support all streaming output
modes, which is the same as 2.4
### How was this patch tested?
existing tests.
Closes #28523 from cloud-fan/update.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 34414acfa36d1a91cd6a64761625c8b0bd90c0a7)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 9 +++++---
.../connector/SupportsStreamingUpdate.scala | 26 ++++++++++++++++++++++
.../datasources/noop/NoopDataSource.scala | 6 +++--
.../sql/execution/streaming/StreamExecution.scala | 12 ++++------
.../spark/sql/execution/streaming/console.scala | 7 +++---
.../streaming/sources/ForeachWriterTable.scala | 8 ++++---
.../sql/execution/streaming/sources/memory.scala | 6 ++++-
7 files changed, 54 insertions(+), 20 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index a5e5d01..ede58bd 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream,
MicroBatchStream}
-import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo,
WriteBuilder}
+import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo,
SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.{Sink, Source}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.internal.connector.{SimpleTableProvider,
SupportsStreamingUpdate}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -394,7 +394,7 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
() => new KafkaScan(options)
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- new WriteBuilder {
+ new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
private val options = info.options
private val inputSchema: StructType = info.schema()
private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
@@ -410,6 +410,9 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
assert(inputSchema != null)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
+
+ override def truncate(): WriteBuilder = this
+ override def update(): WriteBuilder = this
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala
new file mode 100644
index 0000000..32be74a
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SupportsStreamingUpdate.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.write.WriteBuilder
+
+// An internal `WriteBuilder` mixin to support UPDATE streaming output mode.
+// TODO: design an official API for streaming output mode UPDATE.
+trait SupportsStreamingUpdate extends WriteBuilder {
+ def update(): WriteBuilder
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 851cc51..8a6c4dc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table,
TableCapability}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate,
WriteBuilder, WriterCommitMessage}
import
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory,
StreamingWrite}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.internal.connector.{SimpleTableProvider,
SupportsStreamingUpdate}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -52,8 +52,10 @@ private[noop] object NoopTable extends Table with
SupportsWrite {
}
}
-private[noop] object NoopWriteBuilder extends WriteBuilder with
SupportsTruncate {
+private[noop] object NoopWriteBuilder extends WriteBuilder
+ with SupportsTruncate with SupportsStreamingUpdate {
override def truncate(): WriteBuilder = this
+ override def update(): WriteBuilder = this
override def buildForBatch(): BatchWrite = NoopBatchWrite
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9b1951a..18fe38c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -629,14 +630,9 @@ abstract class StreamExecution(
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
case Update =>
- // Although no v2 sinks really support Update mode now, but during
tests we do want them
- // to pretend to support Update mode, and treat Update mode same as
Append mode.
- if (Utils.isTesting) {
- writeBuilder.buildForStreaming()
- } else {
- throw new IllegalArgumentException(
- "Data source v2 streaming sinks does not support Update mode.")
- }
+ require(writeBuilder.isInstanceOf[SupportsStreamingUpdate],
+ table.name + " does not support Update mode.")
+
writeBuilder.asInstanceOf[SupportsStreamingUpdate].update().buildForStreaming()
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index e471e6c..1e64021 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite,
Table, TableCapabi
import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.internal.connector.{SimpleTableProvider,
SupportsStreamingUpdate}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider,
DataSourceRegister}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -73,11 +73,12 @@ object ConsoleTable extends Table with SupportsWrite {
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- new WriteBuilder with SupportsTruncate {
+ new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
private val inputSchema: StructType = info.schema()
- // Do nothing for truncate. Console sink is special that it just prints
all the records.
+ // Do nothing for truncate/update. Console sink is special and it just
prints all the records.
override def truncate(): WriteBuilder = this
+ override def update(): WriteBuilder = this
override def buildForStreaming(): StreamingWrite = {
assert(inputSchema != null)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index ba54c85..57a73c7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite,
Table, TableCapabi
import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo,
PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory,
StreamingWrite}
import org.apache.spark.sql.execution.python.PythonForeachWriter
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate
import org.apache.spark.sql.types.StructType
/**
@@ -54,12 +55,13 @@ case class ForeachWriterTable[T](
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- new WriteBuilder with SupportsTruncate {
+ new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
private var inputSchema: StructType = info.schema()
- // Do nothing for truncate. Foreach sink is special that it just
forwards all the records to
- // ForeachWriter.
+ // Do nothing for truncate/update. Foreach sink is special and it just
forwards all the
+ // records to ForeachWriter.
override def truncate(): WriteBuilder = this
+ override def update(): WriteBuilder = this
override def buildForStreaming(): StreamingWrite = {
new StreamingWrite {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
index deab42b..03ebbb9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite,
Table, TableCapabi
import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory,
LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder,
WriterCommitMessage}
import
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory,
StreamingWrite}
import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.internal.connector.SupportsStreamingUpdate
import org.apache.spark.sql.types.StructType
/**
@@ -53,7 +54,7 @@ class MemorySink extends Table with SupportsWrite with
Logging {
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- new WriteBuilder with SupportsTruncate {
+ new WriteBuilder with SupportsTruncate with SupportsStreamingUpdate {
private var needTruncate: Boolean = false
private val inputSchema: StructType = info.schema()
@@ -62,6 +63,9 @@ class MemorySink extends Table with SupportsWrite with
Logging {
this
}
+ // The in-memory sink treats update as append.
+ override def update(): WriteBuilder = this
+
override def buildForStreaming(): StreamingWrite = {
new MemoryStreamingWrite(MemorySink.this, inputSchema, needTruncate)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]