This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 89167c0ec1d4 [SPARK-53915] Add RealTimeScanExec and ability to execute
long running batches
89167c0ec1d4 is described below
commit 89167c0ec1d4e530bce55a3efb1f804a221003e2
Author: Jerry Peng <[email protected]>
AuthorDate: Mon Oct 27 19:08:03 2025 -0700
[SPARK-53915] Add RealTimeScanExec and ability to execute long running
batches
### What changes were proposed in this pull request?
The following changes are added to support running queries in Real-time Mode
1. RealTimeScanExec. This is a new scan added for Real-time mode which
enables queries to run batches for a fix time duration. It is also allows us
to collect the offsets tasks have processed up to at the end of the batch.
2. Offset management changes needed for Real-time Mode. In Real-time Mode,
offsets that represent the point that tasks has processed up to are collected
at the end of a batch via RealTimeScanExec and sent to the driver. The driver
will then write the offsets to the offset log to persist / checkpoint the
progress of the batch. The commit log for the batch will also be written
afterwards to ensure compatibility with existing streaming queries. This
differs from existing streaming qu [...]
### Why are the changes needed?
With the changes in this PR, simple single stage stateless queries can be
executed in Real-time Mode. With this PR, we can run queries in Real-time Mode
that read from a MemorySource (LowLatencyMemorySource), do some processing, and
write the result to MemorySink (ContinuousMemorySink).
### Does this PR introduce _any_ user-facing change?
Yes, adds functionality to run queries in Real-time Mode
### How was this patch tested?
Many tests are added in this PR. Additional tests will be added in a
subsequent PR to limit the size of this PR.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52620 from jerrypeng/SPARK-53915.
Authored-by: Jerry Peng <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 29 ++
.../main/scala/org/apache/spark/util/Clock.scala | 2 +-
.../datasources/v2/DataSourceV2Relation.scala | 3 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 7 +
.../connector/PartitionOffsetWithIndex.scala} | 27 +-
.../spark/sql/classic/StreamingQueryManager.scala | 21 ++
.../datasources/v2/DataSourceV2Strategy.scala | 30 ++-
.../datasources/v2/RealTimeStreamScanExec.scala | 133 +++++++++-
.../streaming/runtime/MicroBatchExecution.scala | 221 ++++++++++++++--
.../streaming/runtime/ProgressReporter.scala | 15 ++
.../sql/streaming/StreamRealTimeModeSuite.scala | 294 ++++++++++++++++++++-
.../streaming/StreamRealTimeModeSuiteBase.scala | 67 +++++
.../apache/spark/sql/streaming/StreamTest.scala | 79 +++++-
.../util/GlobalSingletonManualClock.scala | 138 ++++++++++
14 files changed, 1012 insertions(+), 54 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index c3f2c49a446b..a3d2d53f5fad 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3850,6 +3850,12 @@
],
"sqlState" : "42601"
},
+ "INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL" : {
+ "message" : [
+ "The real-time trigger interval is set to <interval> ms. This is less
than the <minBatchDuration> ms minimum specified by
spark.sql.streaming.realTimeMode.minBatchDuration."
+ ],
+ "sqlState" : "22023"
+ },
"INVALID_SUBQUERY_EXPRESSION" : {
"message" : [
"Invalid subquery:"
@@ -5552,6 +5558,29 @@
],
"sqlState" : "XXKST"
},
+ "STREAMING_REAL_TIME_MODE" : {
+ "message" : [
+ "Streaming real-time mode has the following limitation:"
+ ],
+ "subClass" : {
+ "ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED" : {
+ "message" : [
+ "Async progress tracking is not supported in real-time mode. Set
option asyncProgressTrackingEnabled to false and retry your query."
+ ]
+ },
+ "IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED" : {
+ "message" : [
+ "Real-time mode does not support union on two or more identical
streaming data sources in a single query. This includes scenarios such as
referencing the same source DataFrame more than once, or using two data sources
with identical configurations for some sources. For Kafka, avoid reusing the
same DataFrame and create different ones. Sources provided in the query:
<sources>"
+ ]
+ },
+ "INPUT_STREAM_NOT_SUPPORTED" : {
+ "message" : [
+ "The input stream <className> is not supported in Real-time Mode."
+ ]
+ }
+ },
+ "sqlState" : "0A000"
+ },
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
"message" : [
"Streaming stateful operator name does not match with the operator in
state metadata. This likely to happen when user adds/removes/changes stateful
operator of existing streaming query.",
diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala
b/core/src/main/scala/org/apache/spark/util/Clock.scala
index e0cb3f4188e6..9eef482af357 100644
--- a/core/src/main/scala/org/apache/spark/util/Clock.scala
+++ b/core/src/main/scala/org/apache/spark/util/Clock.scala
@@ -58,7 +58,7 @@ private[spark] trait Clock {
/**
* A clock backed by the actual time from the OS as reported by the `System`
API.
*/
-private[spark] class SystemClock extends Clock {
+private[spark] class SystemClock extends Clock with Serializable {
val minPollTime = 25L
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7e7990c317aa..089452176b7d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -188,7 +188,8 @@ case class StreamingDataSourceV2Relation(
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap,
- metadataPath: String)
+ metadataPath: String,
+ realTimeModeDuration: Option[Long] = None)
extends DataSourceV2RelationBase(table, output, catalog, identifier,
options) {
override def isStreaming: Boolean = true
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 330a6499c5c7..71a01d4c0700 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3017,6 +3017,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val STREAMING_REAL_TIME_MODE_MIN_BATCH_DURATION = buildConf(
+ "spark.sql.streaming.realTimeMode.minBatchDuration")
+ .doc("The minimum long-running batch duration in milliseconds for
real-time mode.")
+ .version("4.1.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(5000)
+
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`,
`${system:var}`, " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PartitionOffsetWithIndex.scala
similarity index 62%
copy from
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
copy to
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PartitionOffsetWithIndex.scala
index 3614f9159030..9db9bd2ac124 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/PartitionOffsetWithIndex.scala
@@ -14,26 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.sql.internal.connector;
-package org.apache.spark.sql.execution.datasources.v2
+import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
-import org.apache.spark.util.{Clock, SystemClock}
-
-/* The singleton object to control the time in testing */
-object LowLatencyClock {
- private var clock: Clock = new SystemClock
-
- def getClock: Clock = clock
-
- def getTimeMillis(): Long = {
- clock.getTimeMillis()
- }
-
- def waitTillTime(targetTime: Long): Unit = {
- clock.waitTillTime(targetTime)
- }
-
- def setClock(inputClock: Clock): Unit = {
- clock = inputClock
- }
-}
+/**
+ * Internal class for real time mode to pass partition offset from executors
to the driver.
+ */
+private[sql] case class PartitionOffsetWithIndex(index: Long, partitionOffset:
PartitionOffset);
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
index 0470f3b20ecc..bef09703025e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala
@@ -24,6 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.jdk.CollectionConverters._
+import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID}
@@ -189,6 +190,21 @@ class StreamingQueryManager private[sql] (
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ val minBatchDuration =
+
sparkSession.conf.get(SQLConf.STREAMING_REAL_TIME_MODE_MIN_BATCH_DURATION)
+ val realTimeTrigger = trigger.asInstanceOf[RealTimeTrigger]
+ if (realTimeTrigger.batchDurationMs < minBatchDuration) {
+ throw new SparkIllegalArgumentException(
+ errorClass = "INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL",
+ messageParameters = Map(
+ "interval" -> realTimeTrigger.batchDurationMs.toString,
+ "minBatchDuration" -> minBatchDuration.toString
+ )
+ )
+ }
+ }
+
val dataStreamWritePlan = WriteToStreamStatement(
userSpecifiedName,
userSpecifiedCheckpointLocation,
@@ -216,6 +232,11 @@ class StreamingQueryManager private[sql] (
analyzedStreamWritePlan))
case _ =>
val microBatchExecution = if (useAsyncProgressTracking(extraOptions)) {
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ throw new SparkIllegalArgumentException(
+ errorClass =
"STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED"
+ )
+ }
new AsyncProgressTrackingMicroBatchExecution(
sparkSession,
trigger,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 12b8c75adaa7..9c624d951a76 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.EXPR
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier,
ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
@@ -39,7 +39,7 @@ import
org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference,
LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not =>
V2Not, Or => V2Or, Predicate}
import org.apache.spark.sql.connector.read.LocalScan
-import org.apache.spark.sql.connector.read.streaming.{ContinuousStream,
MicroBatchStream}
+import org.apache.spark.sql.connector.read.streaming.{ContinuousStream,
MicroBatchStream, SupportsRealTimeMode}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec,
LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec,
SparkPlan, SparkStrategy => Strategy}
@@ -179,10 +179,28 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isEmpty =>
- val continuousStream = r.stream.asInstanceOf[ContinuousStream]
- val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream,
r.startOffset.get)
- // initialize partitions
- scanExec.inputPartitions
+ val scanExec = if (r.relation.realTimeModeDuration.isDefined) {
+ if (!r.stream.isInstanceOf[SupportsRealTimeMode]) {
+ throw new SparkIllegalArgumentException(
+ errorClass =
"STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED",
+ messageParameters = Map("className" -> r.stream.getClass.getName)
+ )
+ }
+ val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
+ new RealTimeStreamScanExec(
+ r.output,
+ r.scan,
+ microBatchStream,
+ r.startOffset.get,
+ r.relation.realTimeModeDuration.get
+ )
+ } else {
+ val continuousStream = r.stream.asInstanceOf[ContinuousStream]
+ val s = ContinuousScanExec(r.output, r.scan, continuousStream,
r.startOffset.get)
+ // initialize partitions
+ s.inputPartitions
+ s
+ }
// Add a Project here to make sure we produce unsafe rows.
DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec,
!scanExec.supportsColumnar) :: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
index 3614f9159030..c4e072f184e6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala
@@ -17,7 +17,19 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.util.{Clock, SystemClock}
+import java.util.Objects
+
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream,
Offset, SupportsRealTimeMode, SupportsRealTimeRead}
+import
org.apache.spark.sql.connector.read.streaming.SupportsRealTimeRead.RecordStatus
+import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex
+import org.apache.spark.util.{Clock, CollectionAccumulator, SystemClock}
+import org.apache.spark.util.ArrayImplicits._
/* The singleton object to control the time in testing */
object LowLatencyClock {
@@ -37,3 +49,122 @@ object LowLatencyClock {
clock = inputClock
}
}
+
+/**
+ * A wrap reader that turns a Partition Reader extending SupportsRealTimeRead
to a
+ * normal PartitionReader and follow the task termination time
`lowLatencyEndTime`, and
+ * report end offsets in the end to `endOffsets`.
+ */
+case class LowLatencyReaderWrap(
+ reader: SupportsRealTimeRead[InternalRow],
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReader[InternalRow] {
+
+ override def next(): Boolean = {
+ val curTime = LowLatencyClock.getTimeMillis()
+ val ret = if (curTime >= lowLatencyEndTime) {
+ RecordStatus.newStatusWithoutArrivalTime(false)
+ } else {
+ reader.nextWithTimeout(lowLatencyEndTime - curTime)
+ }
+
+ if (!ret.hasRecord) {
+ // The way of using TaskContext.get().partitionId() to map to a partition
+ // may be fragile as it relies on thread locals.
+ endOffsets.add(
+ new PartitionOffsetWithIndex(TaskContext.get().partitionId(),
reader.getOffset)
+ )
+ }
+ ret.hasRecord
+ }
+
+ override def get(): InternalRow = {
+ reader.get()
+ }
+
+ override def close(): Unit = {}
+}
+
+/**
+ * Wrapper factory that creates LowLatencyReaderWrap from reader as
SupportsRealTimeRead
+ */
+case class LowLatencyReaderFactoryWrap(
+ partitionReaderFactory: PartitionReaderFactory,
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReaderFactory
+ with Logging {
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ val rowReader = partitionReaderFactory.createReader(partition)
+ assert(rowReader.isInstanceOf[SupportsRealTimeRead[InternalRow]])
+ logInfo(
+ log"Creating low latency PartitionReader, stopping at " +
+ log"${MDC(LogKeys.TO_TIME, lowLatencyEndTime)}"
+ )
+ LowLatencyReaderWrap(
+ rowReader.asInstanceOf[SupportsRealTimeRead[InternalRow]],
+ lowLatencyEndTime,
+ endOffsets
+ )
+ }
+}
+
+/**
+ * Physical plan node for Real-time Mode to scan/read data from a data source.
+ */
+case class RealTimeStreamScanExec(
+ output: Seq[Attribute],
+ @transient scan: Scan,
+ @transient stream: MicroBatchStream,
+ @transient start: Offset,
+ batchDurationMs: Long)
+ extends DataSourceV2ScanExecBase {
+
+ assert(stream.isInstanceOf[SupportsRealTimeMode])
+
+ override def keyGroupedPartitioning: Option[Seq[Expression]] = None
+
+ override def ordering: Option[Seq[SortOrder]] = None
+
+ val endOffsetsAccumulator: CollectionAccumulator[PartitionOffsetWithIndex] =
{
+
SparkContext.getActive.map(_.collectionAccumulator[PartitionOffsetWithIndex]).get
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case other: RealTimeStreamScanExec =>
+ this.stream == other.stream &&
+ this.batchDurationMs == other.batchDurationMs
+ case _ => false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(stream, batchDurationMs)
+
+ override lazy val readerFactory: PartitionReaderFactory =
stream.createReaderFactory()
+
+ override lazy val inputPartitions: Seq[InputPartition] = {
+ val lls = stream.asInstanceOf[SupportsRealTimeMode]
+ assert(lls != null)
+ lls.planInputPartitions(start).toImmutableArraySeq
+ }
+
+ override def simpleString(maxFields: Int): String =
+ s"${super.simpleString(maxFields)} [batchDurationMs=${batchDurationMs}ms]"
+
+ override lazy val inputRDD: RDD[InternalRow] = {
+
+ val inputRDD = new DataSourceRDD(
+ sparkContext,
+ partitions,
+ LowLatencyReaderFactoryWrap(
+ readerFactory,
+ LowLatencyClock.getTimeMillis() + batchDurationMs,
+ endOffsetsAccumulator
+ ),
+ supportsColumnar,
+ customMetrics
+ )
+ postDriverMetrics()
+ inputRDD
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index f2760c8914bb..d4f9dc8cea93 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.streaming.runtime
import scala.collection.mutable.{Map => MutableMap}
import scala.collection.mutable
+import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
+import org.apache.spark.{SparkIllegalArgumentException,
SparkIllegalStateException}
import org.apache.spark.internal.LogKeys
-import org.apache.spark.internal.LogKeys.BATCH_ID
+import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp,
FileSourceMetadataAttribute, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate,
DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState,
FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation,
LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState,
TransformWithStateInPySpark}
@@ -34,12 +36,12 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.{Dataset, SparkSession}
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
TableCapability}
-import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset
=> OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl,
SupportsTriggerAvailableNow}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset
=> OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl,
SupportsRealTimeMode, SupportsTriggerAvailableNow}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation,
StreamWriterCommitProgress, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset,
OneTimeTrigger, ProcessingTimeTrigger, Sink, Source}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
RealTimeStreamScanExec, StreamingDataSourceV2Relation,
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress,
WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset,
OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger, Sink, Source,
StreamingQueryPlanTraverseHelper}
import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
CommitMetadata, OffsetSeq, OffsetSeqMetadata}
import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo,
StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import
org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
@@ -47,6 +49,7 @@ import
org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConst
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink,
WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
import org.apache.spark.sql.execution.streaming.state.{StateSchemaBroadcast,
StateStoreErrors}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.{Clock, Utils}
@@ -104,6 +107,7 @@ class MicroBatchExecution(
trigger match {
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => SingleBatchExecutor()
+ case _: RealTimeTrigger =>
ProcessingTimeExecutor(ProcessingTimeTrigger(0), triggerClock)
case AvailableNowTrigger =>
// When the flag is enabled, Spark will wrap sources which do not
support
// Trigger.AvailableNow with wrapper implementation, so that
Trigger.AvailableNow can
@@ -200,7 +204,17 @@ class MicroBatchExecution(
val scan = table.newScanBuilder(options).build()
val stream = scan.toMicroBatchStream(metadataPath)
val relation = StreamingDataSourceV2Relation(
- table, output, catalog, identifier, options, metadataPath)
+ table,
+ output,
+ catalog,
+ identifier,
+ options,
+ metadataPath,
+ trigger match {
+ case RealTimeTrigger(duration) => Some(duration)
+ case _ => None
+ }
+ )
StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
})
} else if (v1.isEmpty) {
@@ -229,6 +243,19 @@ class MicroBatchExecution(
case r: StreamingDataSourceV2ScanRelation => r.stream
}
+ // Inform the source if it is in real time mode
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ sources.foreach{
+ case s: SupportsRealTimeMode =>
+ s.prepareForRealTimeMode()
+ case s =>
+ throw new SparkIllegalArgumentException(
+ errorClass = "STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED",
+ messageParameters = Map("className" -> s.getClass.getName)
+ )
+ }
+ }
+
// Initializing TriggerExecutor relies on `sources`, hence calling this
after initializing
// sources.
triggerExecutor = getTrigger()
@@ -263,6 +290,12 @@ class MicroBatchExecution(
case s => s -> ReadLimit.allAvailable()
}.toMap
}
+ if (trigger.isInstanceOf[RealTimeTrigger] && uniqueSources.size !=
sources.size) {
+ throw new SparkIllegalStateException(
+ errorClass =
s"STREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED",
+ messageParameters = Map("sources" -> sources.mkString(", "))
+ )
+ }
// TODO (SPARK-27484): we should add the writing node before the plan is
analyzed.
sink match {
@@ -279,6 +312,15 @@ class MicroBatchExecution(
outputMode)
case s: Sink =>
+ // SinkV1 is not compatible with Real-Time Mode due to API limitations.
+ // SinkV1 does not support writing outputs row by row.
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ throw new SparkIllegalArgumentException(
+ errorClass = "STREAMING_REAL_TIME_MODE.SINK_NOT_SUPPORTED",
+ messageParameters = Map("className" -> s.getClass.getName)
+ )
+ }
+
WriteToMicroBatchDataSourceV1(
plan.catalogTable,
sink = s,
@@ -333,6 +375,29 @@ class MicroBatchExecution(
private def initializeExecution(
sparkSessionForStream: SparkSession): MicroBatchExecutionContext = {
+ var latestStartedBatch = offsetLog.getLatest()
+ val latestCommittedBatch = commitLog.getLatest()
+
+ val lastCommittedBatchId = latestCommittedBatch match {
+ case Some((batchId, _)) => batchId
+ case _ => -1L
+ }
+
+ // For a query running in Real-time Mode that fails after
+ // writing to offset log but before writing to commit log, we delete the
extra
+ // entries in offsetLog to sync up. Note that this also means async
checkpoint rollback handling
+ // is not compatible with Real-time Mode at this stage.
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ val lastOffsetLogBatchId = latestStartedBatch.map(_._1).getOrElse(-1L)
+ if (lastOffsetLogBatchId > lastCommittedBatchId) {
+ logWarning(log"Deleting extra entries in offset log to sync up with
commit log. " +
+ log"Last committed batch id = ${MDC(BATCH_ID,
lastCommittedBatchId)}, " +
+ log"last offset log batch id = ${MDC(CURRENT_BATCH_ID,
lastOffsetLogBatchId)}")
+ offsetLog.purgeAfter(lastCommittedBatchId)
+ latestStartedBatch = offsetLog.getLatest()
+ }
+ }
+
AcceptsLatestSeenOffsetHandler.setLatestSeenOffsetOnSources(
offsetLog.getLatest().map(_._2), sources)
@@ -427,7 +492,12 @@ class MicroBatchExecution(
// Record the trigger offset range for progress reporting *before*
processing the batch
execCtx.recordTriggerOffsets(
from = execCtx.startOffsets,
- to = execCtx.endOffsets,
+ to = if (trigger.isInstanceOf[RealTimeTrigger]) {
+ // We don't know endOffsets in real time mode here.
+ new StreamProgress(Map())
+ } else {
+ execCtx.endOffsets
+ },
latest = execCtx.latestOffsets)
// Remember whether the current batch has data or not. This will be
required later
@@ -634,12 +704,17 @@ class MicroBatchExecution(
* Returns true if there is any new data available to be processed.
*/
private def isNewDataAvailable(execCtx: MicroBatchExecutionContext): Boolean
= {
- execCtx.endOffsets.exists {
- case (source, available) =>
- execCtx.startOffsets
- .get(source)
- .map(committed => committed != available)
- .getOrElse(true)
+ // For real-time mode, we always assume there is new data and run the
batch.
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ true
+ } else {
+ execCtx.endOffsets.exists {
+ case (source, available) =>
+ execCtx.startOffsets
+ .get(source)
+ .map(committed => committed != available)
+ .getOrElse(true)
+ }
}
}
@@ -801,6 +876,19 @@ class MicroBatchExecution(
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch.logicalPlan)
+ case (stream: MicroBatchStream, _) if
trigger.isInstanceOf[RealTimeTrigger] =>
+ if (!stream.isInstanceOf[SupportsRealTimeMode]) {
+ throw new SparkIllegalArgumentException(
+ errorClass =
"STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED",
+ messageParameters = Map("className" -> stream.getClass.getName)
+ )
+ }
+ val current = execCtx.startOffsets.get(stream).map {
+ off => stream.deserializeOffset(off.json)
+ }
+ val startOffset = current.getOrElse(stream.initialOffset)
+ Some(stream -> OffsetHolder(startOffset, None))
+
case (stream: MicroBatchStream, available)
if execCtx.startOffsets.get(stream).map(_ !=
available).getOrElse(true) =>
val current = execCtx.startOffsets.get(stream).map {
@@ -816,7 +904,7 @@ class MicroBatchExecution(
// To be compatible with the v1 source, the `newData` is represented
as a logical plan,
// while the `newData` of v2 source is just the start and end
offsets. Here we return a
// fake logical plan to carry the offsets.
- Some(stream -> OffsetHolder(startOffset, endOffset))
+ Some(stream -> OffsetHolder(startOffset, Some(endOffset)))
case _ => None
}
@@ -892,7 +980,7 @@ class MicroBatchExecution(
case r: StreamingDataSourceV2ScanRelation =>
mutableNewData.get(r.stream).map {
case OffsetHolder(start, end) =>
- r.copy(startOffset = Some(start), endOffset = Some(end))
+ r.copy(startOffset = Some(start), endOffset = end)
}.getOrElse {
// Don't track the source node which is known to produce zero rows.
LocalRelation(r.output, isStreaming = true)
@@ -966,6 +1054,12 @@ class MicroBatchExecution(
SQLExecution.withNewExecutionId(execCtx.executionPlan) {
sink match {
case s: Sink =>
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ throw new SparkIllegalStateException(
+ errorClass = "STREAMING_REAL_TIME_MODE.SINK_NOT_SUPPORTED",
+ messageParameters = Map("className" -> s.getClass.getName)
+ )
+ }
s.addBatch(execCtx.batchId, nextBatch)
// DSv2 write node has a mechanism to invalidate DSv2 relation,
but there is no
// corresponding one for DSv1. Given we have an information of
catalog table for sink,
@@ -997,13 +1091,23 @@ class MicroBatchExecution(
* checkpointing to offset log and any microbatch startup tasks.
*/
protected def markMicroBatchStart(execCtx: MicroBatchExecutionContext): Unit
= {
- if (!offsetLog.add(execCtx.batchId,
- execCtx.endOffsets.toOffsetSeq(sources, execCtx.offsetSeqMetadata))) {
- throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
- }
- logInfo(log"Committed offsets for batch ${MDC(LogKeys.BATCH_ID,
execCtx.batchId)}. " +
- log"Metadata ${MDC(LogKeys.OFFSET_SEQUENCE_METADATA,
execCtx.offsetSeqMetadata.toString)}")
+ if (!trigger.isInstanceOf[RealTimeTrigger]) {
+ if (!offsetLog.add(
+ execCtx.batchId,
+ execCtx.endOffsets.toOffsetSeq(sources, execCtx.offsetSeqMetadata)
+ )) {
+ throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
+ }
+
+ logInfo(
+ log"Committed offsets for batch ${MDC(LogKeys.BATCH_ID,
execCtx.batchId)}. " +
+ log"Metadata ${MDC(LogKeys.OFFSET_SEQUENCE_METADATA,
execCtx.offsetSeqMetadata.toString)}"
+ )
+ } else {
+ logInfo(log"Delay offset logging for batch ${MDC(BATCH_ID,
execCtx.batchId)} " +
+ log"in real time mode.")
+ }
}
/**
@@ -1109,6 +1213,72 @@ class MicroBatchExecution(
sparkSessionForStream.sessionState.conf)) {
updateStateStoreCkptId(execCtx, latestExecPlan)
}
+
+ var needSignalProgressLock = false
+ // In real-time mode, we delay the offset logging until the end of the
batch.
+ // We first gather the offsets processed up to from all
RealTimeStreamScanExec,
+ // i.e. tasks that execute a source partition. We merge the offsets and
+ // write them to the offset log
+ if (trigger.isInstanceOf[RealTimeTrigger]) {
+ val execs = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(lastExecution.executedPlan) {
+ case e: RealTimeStreamScanExec => e
+ }
+
+ val endOffsetMap = MutableMap[SparkDataStream, OffsetV2]()
+ execs.foreach { e =>
+ val lowLatencyExec = e.asInstanceOf[RealTimeStreamScanExec]
+ val accus: Seq[PartitionOffsetWithIndex] =
+ lowLatencyExec.endOffsetsAccumulator.value.asScala.toSeq
+ val sortedPartitionOffsets =
accus.sortBy(_.index).map(_.partitionOffset).toArray
+ val source = e.stream
+ val endOffset = source
+ .asInstanceOf[SupportsRealTimeMode]
+ .mergeOffsets(sortedPartitionOffsets)
+ endOffsetMap += (source -> endOffset)
+ }
+
+ assert(endOffsetMap.size == execs.size, "Identical sources exist in the
physical nodes" +
+ " which is not supported.")
+
+ execCtx.endOffsets ++= endOffsetMap
+ execCtx.recordEndOffsets(execCtx.endOffsets)
+ execCtx.recordTriggerOffsets(
+ from = execCtx.startOffsets,
+ to = execCtx.endOffsets,
+ latest = execCtx.latestOffsets
+ )
+ execCtx.reportTimeTaken("walCommit") {
+ if (!offsetLog.add(
+ execCtx.batchId,
+ execCtx.endOffsets.toOffsetSeq(sources, execCtx.offsetSeqMetadata)
+ )) {
+ throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
+ }
+ }
+ logInfo(
+ log"Committed offsets for batch ${MDC(LogKeys.BATCH_ID,
execCtx.batchId)}. Metadata " +
+ log"${MDC(LogKeys.OFFSET_SEQUENCE_METADATA,
execCtx.offsetSeqMetadata)}"
+ )
+ var shouldUpdate = true
+ sources.foreach { s =>
+ execCtx.startOffsets.get(s).foreach { prevOffsets =>
+ if (!prevOffsets.equals(endOffsetMap(s))) {
+ shouldUpdate = false
+ }
+ }
+ }
+ if (shouldUpdate) {
+ // To trigger processAllAvailable() return.
+ noNewData = true
+
+ // We could signal ProcessAllAvailable to finish here, however
+ // signaling after commit log will make it less likely that the caller
of
+ // ProcessAllAvailable() sees offset log written but not commit log.
+ needSignalProgressLock = true
+ }
+ }
+
execCtx.reportTimeTaken("commitOffsets") {
val stateStoreCkptId = if
(StatefulOperatorStateInfo.enableStateStoreCheckpointIds(
sparkSessionForStream.sessionState.conf)) {
@@ -1122,6 +1292,15 @@ class MicroBatchExecution(
}
}
committedOffsets ++= execCtx.endOffsets
+
+ // RealTime Mode deals with ProcessAllAvailable() differently. It sets
noNewData above
+ // when a batch ends, so we need to signal here. Non-Real-Time mode sets
the same flag
+ // in query planning phase.
+ if (needSignalProgressLock) {
+ withProgressLocked {
+ awaitProgressLockCondition.signalAll()
+ }
+ }
}
protected def cleanUpLastExecutedMicroBatch(execCtx:
MicroBatchExecutionContext): Unit = {
@@ -1153,6 +1332,6 @@ object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"
}
-case class OffsetHolder(start: OffsetV2, end: OffsetV2) extends LeafNode {
+case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends
LeafNode {
override def output: Seq[Attribute] = Nil
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
index d02e992fc190..021822b211a2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream,
ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream}
import org.apache.spark.sql.execution.{QueryExecution,
StreamSourceAwareSparkPlan}
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec,
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
+import org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec
import
org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata
import
org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeWatermarkExec,
StateStoreWriter}
@@ -245,6 +246,14 @@ abstract class ProgressContext(
currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
}
+ /**
+ * Only used by Real-time Mode. For other cases, end offsets are determined
+ * in the batch planning phase so it is never need to be updated.
+ */
+ def recordEndOffsets(to: StreamProgress): Unit = {
+ currentTriggerEndOffsets = to.transform((_, v) => v.json)
+ }
+
/** Finalizes the trigger which did not execute a batch. */
def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
@@ -509,6 +518,10 @@ abstract class ProgressContext(
val numRows =
s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
val source = s.stream
source -> numRows
+ case s: RealTimeStreamScanExec =>
+ val numRows =
s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+ val source = s.stream
+ source -> numRows
}
logDebug("Source -> # input rows\n\t" +
sourceToInputRowsTuples.mkString("\n\t"))
sumRows(sourceToInputRowsTuples)
@@ -558,6 +571,8 @@ abstract class ProgressContext(
// The physical node for DSv2 streaming source contains the
information of the source
// by itself, so leverage it.
Some(ep -> ep.stream)
+ case (_, ep: RealTimeStreamScanExec) =>
+ Some(ep -> ep.stream)
case (lp, ep) =>
logicalPlanLeafToSource.get(lp).map { source => ep -> source }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
index e21355430213..8f653fce4f30 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
@@ -18,12 +18,20 @@
package org.apache.spark.sql.streaming
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.Duration
-import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
-class StreamRealTimeModeSuite extends StreamTest {
+import org.apache.spark.{SparkIllegalArgumentException,
SparkIllegalStateException}
+import org.apache.spark.sql.execution.streaming.{LowLatencyMemoryStream,
RealTimeTrigger}
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemorySink
+import org.apache.spark.sql.internal.SQLConf
+
+class StreamRealTimeModeSuite extends StreamRealTimeModeSuiteBase {
+ import testImplicits._
test("test trigger") {
def testTrigger(trigger: Trigger, actual: Long): Unit = {
@@ -98,4 +106,286 @@ class StreamRealTimeModeSuite extends StreamTest {
}
)
}
+
+ test("processAllAvailable") {
+ val inputData = LowLatencyMemoryStream.singlePartition[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ AddData(inputData, 1, 2, 3),
+ StartStream(),
+ CheckAnswer(2, 3, 4),
+ AddData(inputData, 4, 5, 6),
+ CheckAnswer(2, 3, 4, 5, 6, 7),
+ AddData(inputData, 7),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8),
+ AddData(inputData, 10, 11),
+ ProcessAllAvailable(),
+ StopStream,
+ StartStream(),
+ CheckAnswer(2, 3, 4, 5, 6, 7, 8, 11, 12)
+ )
+ }
+
+ test("error: batch duration is set less than minimum") {
+ val inputData = LowLatencyMemoryStream.singlePartition[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+ val minBatchDuration =
+ spark.conf.get(SQLConf.STREAMING_REAL_TIME_MODE_MIN_BATCH_DURATION)
+ val ex = intercept[SparkIllegalArgumentException] {
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ StartStream(RealTimeTrigger(minBatchDuration - 1))
+ )
+ }
+ checkError(
+ ex,
+ "INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL",
+ parameters = Map(
+ "interval" -> (minBatchDuration - 1).toString,
+ "minBatchDuration" -> minBatchDuration.toString
+ )
+ )
+ }
+
+ test("environment check for real-time mode throws when the valid
configurations aren't set") {
+ val inputData = LowLatencyMemoryStream.singlePartition[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ checkError(
+ intercept[SparkIllegalArgumentException] {
+ testStream(mapped, OutputMode.Update, Map(
+ "asyncProgressTrackingEnabled" -> "true"
+ ), new ContinuousMemorySink())(
+ StartStream()
+ )
+ },
+ "STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED"
+ )
+ }
+
+ test("error when unsupported source is used") {
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ StartStream(),
+ ExpectFailure[SparkIllegalArgumentException] { ex =>
+ checkError(
+ ex.asInstanceOf[SparkIllegalArgumentException],
+ "STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED",
+ parameters =
+ Map("className" ->
"org.apache.spark.sql.execution.streaming.runtime.MemoryStream")
+ )
+ }
+ )
+ }
+
+ test("error on self union") {
+ val inputData = LowLatencyMemoryStream.singlePartition[Int].toDS()
+ val mapped = inputData.map(_ + 1)
+
+ val unioned = mapped
+ .union(inputData)
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+ .map(_ + 1)
+
+ testStream(unioned, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ StartStream(),
+ ExpectFailure[SparkIllegalStateException] { ex =>
+ checkError(
+ ex.asInstanceOf[SparkIllegalStateException],
+ "STREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED",
+ parameters = Map("sources" ->
+ "MemoryStream\\[value#\\d+\\], MemoryStream\\[value#\\d+\\]"),
+ matchPVals = true
+ )
+ }
+ )
+ }
+}
+
+class StreamRealTimeModeWithManualClockSuite extends
StreamRealTimeModeManualClockSuiteBase {
+ import testImplicits._
+
+ test("simple map query") {
+ val inputData = LowLatencyMemoryStream[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ AddData(inputData, 1, 2, 3),
+ StartStream(),
+ CheckAnswerWithTimeout(10000, 2, 3, 4),
+ AddData(inputData, 4, 5, 6),
+ // make sure we can output data before batch ends
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(0),
+ AddData(inputData, 7),
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7, 8),
+ StopStream
+ )
+ }
+
+ test("simple map query with restarts") {
+ val inputData = LowLatencyMemoryStream[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ StartStream(),
+ AddData(inputData, 1, 2, 3),
+ CheckAnswerWithTimeout(10000, 2, 3, 4),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(0),
+ StopStream,
+ AddData(inputData, 4, 5, 6),
+ StartStream(),
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7),
+ StopStream
+ )
+ }
+
+ test("simple map query switching between RTM and MBM") {
+ val inputData = LowLatencyMemoryStream[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ StartStream(defaultTrigger),
+ AddData(inputData, 1, 2, 3),
+ CheckAnswerWithTimeout(10000, 2, 3, 4),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(0),
+ StopStream,
+ AddData(inputData, 4, 5, 6),
+ StartStream(Trigger.ProcessingTime(1000)),
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7),
+ WaitUntilBatchProcessed(1),
+ StopStream,
+ AddData(inputData, 7),
+ StartStream(defaultTrigger),
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7, 8),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(2),
+ StopStream
+ )
+ }
+
+ test("listener progress") {
+ val inputData = LowLatencyMemoryStream.singlePartition[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ var expectedStartOffset: String = null
+ var expectedEndOffset = "{\"0\":3}"
+ var expectedNumInputRows = 3
+ val progressCalled = new AtomicInteger(0)
+ var exception: Option[Exception] = None
+
+ spark.streams.addListener(new StreamingQueryListener {
+ override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {}
+
+ override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
+ val progress: StreamingQueryProgress = event.progress
+ try {
+ assert(progress.sources(0).startOffset == expectedStartOffset,
"startOffset not expected")
+ assert(progress.sources(0).endOffset == expectedEndOffset,
"endOffset not expected")
+ assert(
+ progress.sources(0).numInputRows == expectedNumInputRows,
+ "numInputRows not expected")
+ } catch {
+ case ex: Exception =>
+ exception = Some(ex)
+ }
+ progressCalled.incrementAndGet()
+ }
+
+ override def onQueryTerminated(event:
StreamingQueryListener.QueryTerminatedEvent): Unit = {}
+ })
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ AddData(inputData, 1, 2, 3),
+ StartStream(),
+ CheckAnswerWithTimeout(60000, 2, 3, 4),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(0),
+ Execute { q =>
+ eventually(Timeout(streamingTimeout)) {
+ assert(progressCalled.get() == 1)
+ }
+ expectedEndOffset = "{\"0\":6}"
+ expectedStartOffset = "{\"0\":3}"
+ expectedNumInputRows = 3
+ },
+ AddData(inputData, 4, 5, 6),
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(1),
+ Execute { q =>
+ eventually(Timeout(streamingTimeout)) {
+ assert(progressCalled.get() == 2)
+ }
+ expectedEndOffset = "{\"0\":7}"
+ expectedStartOffset = "{\"0\":6}"
+ expectedNumInputRows = 1
+ },
+ AddData(inputData, 7),
+ CheckAnswerWithTimeout(10000, 2, 3, 4, 5, 6, 7, 8),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(2),
+ StopStream
+ )
+ eventually(Timeout(streamingTimeout)) {
+ assert(progressCalled.get() == 3)
+ }
+ assert(!exception.isDefined, s"${exception}")
+ }
+
+ test("purge offsetLog when it doesn't match with the commit log") {
+ // Simulate when the query fails after commiting the offset log but before
the commit log
+ // by manually deleting the last entry of the commit log.
+ val inputData = LowLatencyMemoryStream[Int](1)
+ val mapped = inputData.toDS().map(_ + 1)
+
+ var lastOffset = -1L
+
+ testStream(mapped, OutputMode.Update, Map.empty, new
ContinuousMemorySink())(
+ AddData(inputData, 1, 2, 3),
+ StartStream(defaultTrigger),
+ CheckAnswerWithTimeout(60000, 2, 3, 4),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(0),
+ AddData(inputData, 4, 5, 6),
+ CheckAnswerWithTimeout(60000, 2, 3, 4, 5, 6, 7),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(1),
+ AddData(inputData, 7),
+ CheckAnswerWithTimeout(60000, 2, 3, 4, 5, 6, 7, 8),
+ advanceRealTimeClock,
+ WaitUntilBatchProcessed(2),
+ StopStream,
+ Execute { q =>
+ // Delete the last committed batch from the commit log to simulate the
query fails
+ // between writing the offset log and the commit log.
+ val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+ val offset = q.offsetLog.getLatest().map(_._1).getOrElse(-1L)
+ assert(commit == offset)
+ q.commitLog.purgeAfter(commit - 1)
+ val commitAfterDelete =
q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+ assert(commitAfterDelete == offset - 1)
+ lastOffset = commitAfterDelete
+ },
+ StartStream(defaultTrigger),
+ CheckAnswerWithTimeout(60000, 2, 3, 4, 5, 6, 7, 8, 8),
+ StopStream,
+ Execute { q =>
+ val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+ val offset = q.offsetLog.getLatest().map(_._1).getOrElse(-1L)
+ assert(commit == offset && commit == lastOffset)
+ },
+ AddData(inputData, 8),
+ StartStream(defaultTrigger),
+ CheckAnswerWithTimeout(60000, 2, 3, 4, 5, 6, 7, 8, 8, 8, 9),
+ StopStream
+ )
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuiteBase.scala
new file mode 100644
index 000000000000..5bb01bdea26e
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuiteBase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.matchers.should.Matchers
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.execution.datasources.v2.LowLatencyClock
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.util.GlobalSingletonManualClock
+import org.apache.spark.sql.test.TestSparkSession
+
+/**
+ * Base class for tests that require real-time mode.
+ */
+trait StreamRealTimeModeSuiteBase extends StreamTest with Matchers {
+ override protected val defaultTrigger = RealTimeTrigger.apply("4 seconds")
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(SQLConf.STREAMING_REAL_TIME_MODE_MIN_BATCH_DURATION,
+ defaultTrigger.batchDurationMs)
+ }
+
+ override protected def createSparkSession = new TestSparkSession(
+ new SparkContext(
+ "local[10]", // Ensure enough number of cores to ensure concurrent
schedule of all tasks.
+ "streaming-rtm-context",
+ sparkConf.set("spark.sql.testkey", "true")))
+}
+
+abstract class StreamRealTimeModeManualClockSuiteBase extends
StreamRealTimeModeSuiteBase {
+ var clock = new GlobalSingletonManualClock()
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ LowLatencyClock.setClock(clock)
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ GlobalSingletonManualClock.reset()
+ }
+
+ val advanceRealTimeClock = new ExternalAction {
+ override def runAction(): Unit = {
+ clock.advance(defaultTrigger.batchDurationMs)
+ }
+ }
+}
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 5b084c89eba1..3ff8cab64d65 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -27,7 +27,7 @@ import org.scalatest.{Assertions, BeforeAndAfterAll}
import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler,
TimeLimits}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.Span
+import org.scalatest.time.{Millis, Span}
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkEnv
@@ -172,6 +172,31 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
CheckAnswerRowsByFunc(globalCheckFunction, false)
}
+ object CheckAnswerWithTimeout {
+
+ def apply[A: Encoder](timeoutMs: Long, data: A*): CheckAnswerRowsNoWait = {
+ val toExternalRow = createToExternalRowConverter[A]()
+ CheckAnswerRowsNoWait(data.map(toExternalRow), timeoutMs)
+ }
+
+ def apply(timeoutMs: Long, rows: Row*): CheckAnswerRowsNoWait =
+ CheckAnswerRowsNoWait(rows, timeoutMs)
+ }
+
+ /**
+ * Used for testing Real-time mode where batches run for a fixed amount of
time
+ */
+ object CheckAnswerRowsContainsWithTimeout {
+
+ def apply[A: Encoder](timeoutMs: Long, data: A*):
CheckAnswerRowsContainsWithTimeout = {
+ val toExternalRow = createToExternalRowConverter[A]()
+ CheckAnswerRowsContainsWithTimeout(data.map(toExternalRow), timeoutMs)
+ }
+
+ def apply(timeoutMs: Long, rows: Row*): CheckAnswerRowsContainsWithTimeout
=
+ CheckAnswerRowsContainsWithTimeout(rows, timeoutMs)
+ }
+
/**
* Checks to make sure that the current data stored in the sink matches the
`expectedAnswer`.
* This operation automatically blocks until all added data has been
processed.
@@ -218,6 +243,20 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
override def toString: String = s"CheckNewAnswer:
${expectedAnswer.mkString(",")}"
}
+ case class CheckAnswerRowsNoWait(expectedAnswer: Seq[Row], waitTimeoutMs:
Long)
+ extends StreamAction
+ with StreamMustBeRunning {
+ override def toString: String = s"$operatorName:
${expectedAnswer.mkString(",")}"
+ private def operatorName = "CheckAnswerWithTimeout"
+ }
+
+ case class CheckAnswerRowsContainsWithTimeout(expectedAnswer: Seq[Row],
waitTimeoutMs: Long)
+ extends StreamAction
+ with StreamMustBeRunning {
+ override def toString: String = s"$operatorName:
${expectedAnswer.mkString(",")}"
+ private def operatorName = "CheckAnswerContainsWithTimeout"
+ }
+
object CheckNewAnswer {
def apply(): CheckNewAnswerRows = CheckNewAnswerRows(Seq.empty)
@@ -243,6 +282,8 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
/** Advance the trigger clock's time manually. */
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
+ case class WaitUntilBatchProcessed(batchId: Long) extends StreamAction with
StreamMustBeRunning
+
/**
* Signals that a failure is expected and should not kill the test.
*
@@ -599,6 +640,25 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
s"Unexpected clock time after updating: " +
s"expecting $manualClockExpectedTime, current
${clock.getTimeMillis()}")
+ case WaitUntilBatchProcessed(batchId) =>
+ eventually("Next batch was never processed") {
+ if (!currentStream.exception.isDefined) {
+ assert(currentStream.commitLog.getLatestBatchId().getOrElse(-1L)
>= batchId)
+
+ // The progress gets updated after the commit log is updated,
but callers
+ // use WaitUntil(Current)BatchProcessed to check the streaming
query progress.
+ //
+ // Checking the progress alone is not enough because progress
can be posted due
+ // to idleness even if the commitLog has not been updated.
+ val latestProgressBatchId =
+
currentStream.recentProgress.lastOption.map(_.batchId).getOrElse(-1L)
+ assert(latestProgressBatchId >= batchId)
+ }
+ }
+ if (currentStream.exception.isDefined) {
+ throw currentStream.exception.get
+ }
+
case StopStream =>
verify(currentStream != null, "can not stop a stream that is not
running")
try failAfter(streamingTimeout) {
@@ -743,6 +803,23 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
case e: ExternalAction =>
e.runAction()
+
+ case CheckAnswerRowsNoWait(expectedAnswer, timeoutMs) =>
+ Eventually.eventually(Timeout(Span(timeoutMs, Millis))) {
+ val sparkAnswer = sink.allData
+ QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+ error => failTest(error)
+ }
+ }
+
+ case CheckAnswerRowsContainsWithTimeout(expectedAnswer, timeoutMs) =>
+ Eventually.eventually(Timeout(Span(timeoutMs, Millis))) {
+ val sparkAnswer = sink.allData
+ QueryTest.includesRows(expectedAnswer, sparkAnswer).foreach {
error =>
+ failTest(error)
+ }
+ }
+
case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
val sparkAnswer = fetchStreamAnswer(currentStream, lastOnly)
QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/GlobalSingletonManualClock.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/GlobalSingletonManualClock.scala
new file mode 100644
index 000000000000..ac5baafece1a
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/GlobalSingletonManualClock.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.SparkContext.DRIVER_IDENTIFIER
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.util.RpcUtils
+
+/**
+ * A global manual clock that is backed by a singleton.
+ * Should use if the whole query is running in one process
+ */
+class GlobalSingletonManualClock extends StreamManualClock with Serializable {
+ override def getTimeMillis(): Long = {
+ GlobalSingletonManualClock.currentTime
+ }
+
+ override def advance(timeToAdd: Long): Unit = {
+ GlobalSingletonManualClock.currentTime =
GlobalSingletonManualClock.currentTime + timeToAdd
+ }
+}
+
+object GlobalSingletonManualClock {
+ @volatile var currentTime: Long = 0
+
+ def reset(): Unit = {
+ GlobalSingletonManualClock.currentTime = 0
+ }
+}
+
+/**
+ * Creates a manual clock that can be synced across driver and workers in
separate processes.
+ * A clock server is started on the driver and workers will connect to that
server to get the
+ * current time.
+ */
+class GlobalManualClock(endpointName: String)
+ extends StreamManualClock with Serializable with Logging {
+
+ private var clockServer: Option[GlobalSyncClockServer] = None
+ private var clockClient: Option[GlobalSyncClockClient] = None
+
+ private def getClock(): StreamManualClock = {
+ if (isDriver) {
+ if (clockServer.isEmpty) {
+ clockServer = Some(new GlobalSyncClockServer(endpointName))
+ clockServer.get.setup()
+ }
+ clockServer.get
+ } else {
+ if (clockClient.isEmpty) {
+ clockClient = Some(new GlobalSyncClockClient(endpointName))
+ }
+ clockClient.get
+ }
+ }
+
+ private def isDriver: Boolean = {
+ val executorId = SparkEnv.get.executorId
+ // Check for null to match the behavior of executorId == DRIVER_IDENTIFIER
+ executorId != null && executorId.startsWith(DRIVER_IDENTIFIER)
+ }
+
+ override def getTimeMillis(): Long = {
+ getClock().getTimeMillis()
+ }
+
+ override def advance(timeToAdd: Long): Unit = {
+ getClock().advance(timeToAdd)
+ }
+}
+
+class GlobalSyncClockServer(endpointName: String)
+ extends StreamManualClock with Serializable with Logging {
+ @volatile var currentTime: Long = 0
+
+ def setup(): Unit = {
+ val endpoint = new GlobalClockEndpoint(this)
+ val endpointRef = endpoint.rpcEnv.setupEndpoint(endpointName, endpoint)
+ }
+
+
+ override def getTimeMillis(): Long = {
+ currentTime
+ }
+
+ override def advance(timeToAdd: Long): Unit = {
+ currentTime += timeToAdd
+ }
+}
+
+class GlobalSyncClockClient(driverEndpointName: String)
+ extends StreamManualClock with Serializable with Logging {
+ @volatile var currentTime: Long = 0
+
+ private lazy val endpoint = RpcUtils.makeDriverRef(
+ driverEndpointName,
+ SparkEnv.get.conf,
+ SparkEnv.get.rpcEnv)
+
+ override def getTimeMillis(): Long = {
+ val result = endpoint.askSync[Long]()
+ result
+ }
+
+ override def advance(timeToAdd: Long): Unit = {
+ // scalastyle:off throwerror
+ throw new NotImplementedError()
+ // scalastyle:on throwerror
+ }
+}
+
+class GlobalClockEndpoint(clock: GlobalSyncClockServer)
+ extends ThreadSafeRpcEndpoint with Logging with Serializable {
+
+ override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
+ case _ =>
+ context.reply(clock.getTimeMillis())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]