This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0e6e15ca633 [SPARK-45080][SS] Explicitly call out support for columnar
in DSv2 streaming data sources
0e6e15ca633 is described below
commit 0e6e15ca6331d37a6c38c970556903c6df5d5dfb
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Sep 7 19:42:58 2023 +0900
[SPARK-45080][SS] Explicitly call out support for columnar in DSv2
streaming data sources
### What changes were proposed in this pull request?
This PR proposes to override `Scan.columnarSupportMode` for DSv2 streaming
data sources. All of them don't support columnar. This applies
[SPARK-44505](https://issues.apache.org/jira/browse/SPARK-44505) to the DSv2
streaming data sources.
Rationalization will be explained in the next section.
### Why are the changes needed?
The default value for `Scan.columnarSupportMode` is `PARTITION_DEFINED`,
which requires `inputPartitions` to be called/evaluated. That could be
referenced multiple times during planning.
In `MicrobatchScanExec`, we define `inputPartitions` as lazy val, so that
there is no multiple evaluation of inputPartitions, which calls
`MicroBatchStream.planInputPartitions`. But we missed that there is no
guarantee that the instance will be initialized only once (although the actual
execution will happen once) - for example, executedPlan clones the plan
(internally we call constructor to make a deep copy of the node), explain
(internally called to build a SQL execution start event [...]
I see `MicroBatchStream.planInputPartitions` gets called 4 times per
microbatch, which can be concerning if the overhead of planInputPartitions is
non-trivial, specifically Kafka.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42823 from HeartSaVioR/SPARK-45080.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 3 +++
.../main/scala/org/apache/spark/sql/execution/streaming/memory.scala | 3 +++
.../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala | 3 +++
.../spark/sql/execution/streaming/sources/RateStreamProvider.scala | 3 +++
.../sql/execution/streaming/sources/TextSocketSourceProvider.scala | 3 +++
.../scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 4 ++--
6 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index de78992533b..d9e3a1256ea 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -525,6 +525,9 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(new OffsetOutOfRangeMetric, new DataLossMetric)
}
+
+ override def columnarSupportMode(): Scan.ColumnarSupportMode =
+ Scan.ColumnarSupportMode.UNSUPPORTED
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 34076f26fe8..732eaa8d783 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -139,6 +139,9 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_])
extends ScanBuilder w
override def toContinuousStream(checkpointLocation: String):
ContinuousStream = {
stream.asInstanceOf[ContinuousStream]
}
+
+ override def columnarSupportMode(): Scan.ColumnarSupportMode =
+ Scan.ColumnarSupportMode.UNSUPPORTED
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
index 41878a6a549..17cc1860fbd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
@@ -111,6 +111,9 @@ class RatePerMicroBatchTable(
override def toContinuousStream(checkpointLocation: String):
ContinuousStream = {
throw new UnsupportedOperationException("continuous mode is not
supported!")
}
+
+ override def columnarSupportMode(): Scan.ColumnarSupportMode =
+ Scan.ColumnarSupportMode.UNSUPPORTED
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index bf2cc770d79..24e283f4ad6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -100,6 +100,9 @@ class RateStreamTable(
override def toContinuousStream(checkpointLocation: String):
ContinuousStream =
new RateStreamContinuousStream(rowsPerSecond, numPartitions)
+
+ override def columnarSupportMode(): Scan.ColumnarSupportMode =
+ Scan.ColumnarSupportMode.UNSUPPORTED
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 1ab88cd41d8..e4251cc7d39 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -98,6 +98,9 @@ class TextSocketTable(host: String, port: Int, numPartitions:
Int, includeTimest
override def toContinuousStream(checkpointLocation: String):
ContinuousStream = {
new TextSocketContinuousStream(host, port, numPartitions, options)
}
+
+ override def columnarSupportMode(): Scan.ColumnarSupportMode =
+ Scan.ColumnarSupportMode.UNSUPPORTED
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4a6325eb060..c3729d50ed0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -329,10 +329,10 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
assert(progress.processedRowsPerSecond === 4.0)
assert(progress.durationMs.get("latestOffset") === 50)
- assert(progress.durationMs.get("queryPlanning") === 100)
+ assert(progress.durationMs.get("queryPlanning") === 0)
assert(progress.durationMs.get("walCommit") === 0)
assert(progress.durationMs.get("commitOffsets") === 0)
- assert(progress.durationMs.get("addBatch") === 350)
+ assert(progress.durationMs.get("addBatch") === 450)
assert(progress.durationMs.get("triggerExecution") === 500)
assert(progress.sources.length === 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]