This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5da9d2a47e3a [SPARK-54867][SS] Introduce NamedStreamingRelation
wrapper for source identification during analysis
5da9d2a47e3a is described below
commit 5da9d2a47e3a844099f5cc668e1a8a0f7fd19c54
Author: ericm-db <[email protected]>
AuthorDate: Fri Jan 2 11:55:27 2026 -0800
[SPARK-54867][SS] Introduce NamedStreamingRelation wrapper for source
identification during analysis
### What changes were proposed in this pull request?
This PR introduces infrastructure for tracking and propagating source
identifying names through query analysis for streaming queries. It adds:
1. **StreamingSourceIdentifyingName** - A sealed trait hierarchy
representing the naming state of streaming sources:
- `UserProvided(name)` - Explicitly set via `.name()` API
- `FlowAssigned(name)` - Assigned by external flow systems (e.g., SDP)
- `Unassigned` - No name assigned yet (to be auto-generated)
2. **NamedStreamingRelation** - A transparent wrapper node that:
- Carries source identifying names through the analyzer phase
- Extends `UnaryNode` for transparent interaction with analyzer rules
- Stays unresolved until explicitly unwrapped by a future
`NameStreamingSources` analyzer rule
- Provides `withUserProvidedName()` to attach user-specified names
3. **NAMED_STREAMING_RELATION** tree pattern for efficient pattern matching
### Why are the changes needed?
Streaming sources need stable, predictable names for:
- **Checkpoint location stability** - Schema evolution and offset tracking
require consistent source identification
- **Schema lookup at specific offsets** - Analysis-time operations need to
reference sources by name
- **Flow integration** - SDP and similar systems need per-source metadata
paths
By introducing this wrapper during analysis (rather than at execution
planning), we enable these capabilities while maintaining a clean separation
between parsing, analysis, and execution phases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `NamedStreamingRelationSuite` covering:
- Source name state transitions (Unassigned → UserProvided)
- Output delegation to child plan
- Tree pattern registration
- Resolved state behavior
- String representation
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53639 from ericm-db/named-streaming-relation.
Authored-by: ericm-db <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../catalyst/analysis/NamedStreamingRelation.scala | 85 +++++++++++++++
.../streaming/StreamingSourceIdentifyingName.scala | 51 +++++++++
.../spark/sql/catalyst/trees/TreePatterns.scala | 1 +
.../analysis/NamedStreamingRelationSuite.scala | 120 +++++++++++++++++++++
4 files changed, 257 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelation.scala
new file mode 100644
index 000000000000..05c45cdcaff3
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelation.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import
org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName,
UserProvided}
+import
org.apache.spark.sql.catalyst.trees.TreePattern.{NAMED_STREAMING_RELATION,
TreePattern}
+
+/**
+ * A wrapper for streaming relations that carries a source identifying name
through analysis.
+ *
+ * This node is introduced during query parsing/resolution and is removed by
the
+ * [[NameStreamingSources]] analyzer rule. It serves to:
+ * 1. Track user-provided source names from `.name()` API
+ * 2. Track flow-assigned names from SDP context
+ * 3. Ensure all sources have names before execution planning
+ *
+ * By extending [[UnaryNode]], this wrapper is transparent to analyzer rules -
they naturally
+ * descend into the child plan via `mapChildren`, resolve it, and the wrapper
persists with the
+ * updated child. This eliminates the need for explicit handling in most
analyzer rules.
+ *
+ * The naming happens in the analyzer (before execution) to enable:
+ * - Schema lookup at specific offsets during analysis
+ * - Stable checkpoint locations for source evolution
+ * - SDP flow integration with per-source metadata paths
+ *
+ * @param child The underlying streaming relation (UnresolvedDataSource, etc.)
+ * @param sourceIdentifyingName The source identifying name (UserProvided,
FlowAssigned,
+ * or Unassigned)
+ */
+case class NamedStreamingRelation(
+ child: LogicalPlan,
+ sourceIdentifyingName: StreamingSourceIdentifyingName)
+ extends UnaryNode {
+
+ override def isStreaming: Boolean = true
+
+ // Delegate output to child for transparent wrapping
+ override def output: Seq[Attribute] = child.output
+
+ // Keep unresolved until NameStreamingSources explicitly unwraps this node.
+ // This ensures the wrapper persists through analysis until we're ready to
+ // propagate the sourceIdentifyingName to the underlying StreamingRelationV2.
+ override lazy val resolved: Boolean = false
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
NamedStreamingRelation =
+ copy(child = newChild)
+
+ /**
+ * Attaches a user-provided name from the `.name()` API.
+ * If nameOpt is None, returns this node unchanged.
+ *
+ * @param nameOpt The user-provided source name
+ * @return A new NamedStreamingRelation with the user name attached
+ */
+ def withUserProvidedName(nameOpt: Option[String]): NamedStreamingRelation = {
+ nameOpt.map { n =>
+ copy(sourceIdentifyingName = UserProvided(n))
+ }.getOrElse {
+ this
+ }
+ }
+
+ override val nodePatterns: Seq[TreePattern] = Seq(NAMED_STREAMING_RELATION)
+
+ override def toString: String = {
+ s"NamedStreamingRelation($child, $sourceIdentifyingName)"
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
new file mode 100644
index 000000000000..ce5ac5be3b5a
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+/**
+ * Represents the identifying name state for a streaming source during query
analysis.
+ *
+ * Source names can be:
+ * - User-provided via the `.name()` API
+ * - Flow-assigned by external systems (e.g., SDP)
+ * - Unassigned, to be auto-generated during analysis
+ */
+sealed trait StreamingSourceIdentifyingName {
+ override def toString: String = this match {
+ case UserProvided(name) => s"""name="$name""""
+ case FlowAssigned(name) => s"""name="$name""""
+ case Unassigned => "name=<Unassigned>"
+ }
+}
+
+/**
+ * A source name explicitly provided by the user via the `.name()` API.
+ * Takes highest precedence.
+ */
+case class UserProvided(name: String) extends StreamingSourceIdentifyingName
+
+/**
+ * A source name assigned by an external flow system (e.g., SDP).
+ * Used when the source is part of a managed pipeline.
+ */
+case class FlowAssigned(name: String) extends StreamingSourceIdentifyingName
+
+/**
+ * No name has been assigned yet. The analyzer will auto-generate one if
needed.
+ */
+case object Unassigned extends StreamingSourceIdentifyingName
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index a650eb8ed536..d3398147d31a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -179,6 +179,7 @@ object TreePattern extends Enumeration {
val WITH_WINDOW_DEFINITION: Value = Value
// Unresolved Plan patterns (Alphabetically ordered)
+ val NAMED_STREAMING_RELATION: Value = Value
val PLAN_WITH_UNRESOLVED_IDENTIFIER: Value = Value
val UNRESOLVED_EVENT_TIME_WATERMARK: Value = Value
val UNRESOLVED_HAVING: Value = Value
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelationSuite.scala
new file mode 100644
index 000000000000..24e5d501dc5e
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelationSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, Unassigned,
UserProvided}
+import org.apache.spark.sql.catalyst.trees.TreePattern
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Unit tests for the NamedStreamingRelation wrapper node.
+ */
+class NamedStreamingRelationSuite extends SparkFunSuite {
+
+ private def createMockPlan(): LocalRelation = {
+ LocalRelation(AttributeReference("id", IntegerType)())
+ }
+
+ test("Unassigned sourceIdentifyingName") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+ assert(wrapper.child eq plan)
+ assert(wrapper.sourceIdentifyingName == Unassigned)
+ }
+
+ test("withUserProvidedName sets UserProvided name") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, Unassigned)
+ val named = wrapper.withUserProvidedName(Some("my_source"))
+
+ assert(named.sourceIdentifyingName == UserProvided("my_source"))
+ }
+
+ test("withUserProvidedName(None) returns same instance") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, Unassigned)
+ val result = wrapper.withUserProvidedName(None)
+
+ assert(result eq wrapper)
+ }
+
+ test("isStreaming returns true") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+ assert(wrapper.isStreaming)
+ }
+
+ test("has NAMED_STREAMING_RELATION tree pattern") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+ assert(wrapper.nodePatterns.contains(TreePattern.NAMED_STREAMING_RELATION))
+ }
+
+ test("resolved is false until unwrapped") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, UserProvided("test"))
+
+ // Even with a named child, wrapper stays unresolved
+ assert(!wrapper.resolved)
+ }
+
+ test("output delegates to child") {
+ val plan = createMockPlan()
+ val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+ assert(wrapper.output == plan.output)
+ }
+
+ test("pattern matching on sourceIdentifyingName variants") {
+ val plan = createMockPlan()
+
+ val userProvided = NamedStreamingRelation(plan, UserProvided("test"))
+ val flowAssigned = NamedStreamingRelation(plan, FlowAssigned("0"))
+ val unassigned = NamedStreamingRelation(plan, Unassigned)
+
+ def extractName(wrapper: NamedStreamingRelation): Option[String] = {
+ wrapper.sourceIdentifyingName match {
+ case UserProvided(n) => Some(n)
+ case FlowAssigned(n) => Some(n)
+ case Unassigned => None
+ }
+ }
+
+ assert(extractName(userProvided).contains("test"))
+ assert(extractName(flowAssigned).contains("0"))
+ assert(extractName(unassigned).isEmpty)
+ }
+
+ test("toString includes sourceIdentifyingName") {
+ val plan = createMockPlan()
+
+ val userNamed = NamedStreamingRelation(plan, UserProvided("my_source"))
+ val flowNamed = NamedStreamingRelation(plan, FlowAssigned("0"))
+ val unnamed = NamedStreamingRelation(plan, Unassigned)
+
+ assert(userNamed.toString.contains("name=\"my_source\""))
+ assert(flowNamed.toString.contains("name=\"0\""))
+ assert(unnamed.toString.contains("name=<Unassigned>"))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]