This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5da9d2a47e3a [SPARK-54867][SS] Introduce NamedStreamingRelation 
wrapper for source identification during analysis
5da9d2a47e3a is described below

commit 5da9d2a47e3a844099f5cc668e1a8a0f7fd19c54
Author: ericm-db <[email protected]>
AuthorDate: Fri Jan 2 11:55:27 2026 -0800

    [SPARK-54867][SS] Introduce NamedStreamingRelation wrapper for source 
identification during analysis
    
    ### What changes were proposed in this pull request?
    
    This PR introduces infrastructure for tracking and propagating source 
identifying names through query analysis for streaming queries. It adds:
    
    1. **StreamingSourceIdentifyingName** - A sealed trait hierarchy 
representing the naming state of streaming sources:
       - `UserProvided(name)` - Explicitly set via `.name()` API
       - `FlowAssigned(name)` - Assigned by external flow systems (e.g., SDP)
       - `Unassigned` - No name assigned yet (to be auto-generated)
    
    2. **NamedStreamingRelation** - A transparent wrapper node that:
       - Carries source identifying names through the analyzer phase
       - Extends `UnaryNode` for transparent interaction with analyzer rules
       - Stays unresolved until explicitly unwrapped by a future 
`NameStreamingSources` analyzer rule
       - Provides `withUserProvidedName()` to attach user-specified names
    
    3. **NAMED_STREAMING_RELATION** tree pattern for efficient pattern matching
    
    ### Why are the changes needed?
    
    Streaming sources need stable, predictable names for:
    - **Checkpoint location stability** - Schema evolution and offset tracking 
require consistent source identification
    - **Schema lookup at specific offsets** - Analysis-time operations need to 
reference sources by name
    - **Flow integration** - SDP and similar systems need per-source metadata 
paths
    
    By introducing this wrapper during analysis (rather than at execution 
planning), we enable these capabilities while maintaining a clean separation 
between parsing, analysis, and execution phases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit tests in `NamedStreamingRelationSuite` covering:
    - Source name state transitions (Unassigned → UserProvided)
    - Output delegation to child plan
    - Tree pattern registration
    - Resolved state behavior
    - String representation
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53639 from ericm-db/named-streaming-relation.
    
    Authored-by: ericm-db <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../catalyst/analysis/NamedStreamingRelation.scala |  85 +++++++++++++++
 .../streaming/StreamingSourceIdentifyingName.scala |  51 +++++++++
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   1 +
 .../analysis/NamedStreamingRelationSuite.scala     | 120 +++++++++++++++++++++
 4 files changed, 257 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelation.scala
new file mode 100644
index 000000000000..05c45cdcaff3
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelation.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import 
org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, 
UserProvided}
+import 
org.apache.spark.sql.catalyst.trees.TreePattern.{NAMED_STREAMING_RELATION, 
TreePattern}
+
+/**
+ * A wrapper for streaming relations that carries a source identifying name 
through analysis.
+ *
+ * This node is introduced during query parsing/resolution and is removed by 
the
+ * [[NameStreamingSources]] analyzer rule. It serves to:
+ * 1. Track user-provided source names from `.name()` API
+ * 2. Track flow-assigned names from SDP context
+ * 3. Ensure all sources have names before execution planning
+ *
+ * By extending [[UnaryNode]], this wrapper is transparent to analyzer rules - 
they naturally
+ * descend into the child plan via `mapChildren`, resolve it, and the wrapper 
persists with the
+ * updated child. This eliminates the need for explicit handling in most 
analyzer rules.
+ *
+ * The naming happens in the analyzer (before execution) to enable:
+ * - Schema lookup at specific offsets during analysis
+ * - Stable checkpoint locations for source evolution
+ * - SDP flow integration with per-source metadata paths
+ *
+ * @param child The underlying streaming relation (UnresolvedDataSource, etc.)
+ * @param sourceIdentifyingName The source identifying name (UserProvided, 
FlowAssigned,
+ *                              or Unassigned)
+ */
+case class NamedStreamingRelation(
+    child: LogicalPlan,
+    sourceIdentifyingName: StreamingSourceIdentifyingName)
+  extends UnaryNode {
+
+  override def isStreaming: Boolean = true
+
+  // Delegate output to child for transparent wrapping
+  override def output: Seq[Attribute] = child.output
+
+  // Keep unresolved until NameStreamingSources explicitly unwraps this node.
+  // This ensures the wrapper persists through analysis until we're ready to
+  // propagate the sourceIdentifyingName to the underlying StreamingRelationV2.
+  override lazy val resolved: Boolean = false
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
NamedStreamingRelation =
+    copy(child = newChild)
+
+  /**
+   * Attaches a user-provided name from the `.name()` API.
+   * If nameOpt is None, returns this node unchanged.
+   *
+   * @param nameOpt The user-provided source name
+   * @return A new NamedStreamingRelation with the user name attached
+   */
+  def withUserProvidedName(nameOpt: Option[String]): NamedStreamingRelation = {
+    nameOpt.map { n =>
+      copy(sourceIdentifyingName = UserProvided(n))
+    }.getOrElse {
+      this
+    }
+  }
+
+  override val nodePatterns: Seq[TreePattern] = Seq(NAMED_STREAMING_RELATION)
+
+  override def toString: String = {
+    s"NamedStreamingRelation($child, $sourceIdentifyingName)"
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
new file mode 100644
index 000000000000..ce5ac5be3b5a
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+/**
+ * Represents the identifying name state for a streaming source during query 
analysis.
+ *
+ * Source names can be:
+ * - User-provided via the `.name()` API
+ * - Flow-assigned by external systems (e.g., SDP)
+ * - Unassigned, to be auto-generated during analysis
+ */
+sealed trait StreamingSourceIdentifyingName {
+  override def toString: String = this match {
+    case UserProvided(name) => s"""name="$name""""
+    case FlowAssigned(name) => s"""name="$name""""
+    case Unassigned => "name=<Unassigned>"
+  }
+}
+
+/**
+ * A source name explicitly provided by the user via the `.name()` API.
+ * Takes highest precedence.
+ */
+case class UserProvided(name: String) extends StreamingSourceIdentifyingName
+
+/**
+ * A source name assigned by an external flow system (e.g., SDP).
+ * Used when the source is part of a managed pipeline.
+ */
+case class FlowAssigned(name: String) extends StreamingSourceIdentifyingName
+
+/**
+ * No name has been assigned yet. The analyzer will auto-generate one if 
needed.
+ */
+case object Unassigned extends StreamingSourceIdentifyingName
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index a650eb8ed536..d3398147d31a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -179,6 +179,7 @@ object TreePattern extends Enumeration  {
   val WITH_WINDOW_DEFINITION: Value = Value
 
   // Unresolved Plan patterns (Alphabetically ordered)
+  val NAMED_STREAMING_RELATION: Value = Value
   val PLAN_WITH_UNRESOLVED_IDENTIFIER: Value = Value
   val UNRESOLVED_EVENT_TIME_WATERMARK: Value = Value
   val UNRESOLVED_HAVING: Value = Value
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelationSuite.scala
new file mode 100644
index 000000000000..24e5d501dc5e
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/NamedStreamingRelationSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, Unassigned, 
UserProvided}
+import org.apache.spark.sql.catalyst.trees.TreePattern
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Unit tests for the NamedStreamingRelation wrapper node.
+ */
+class NamedStreamingRelationSuite extends SparkFunSuite {
+
+  private def createMockPlan(): LocalRelation = {
+    LocalRelation(AttributeReference("id", IntegerType)())
+  }
+
+  test("Unassigned sourceIdentifyingName") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+    assert(wrapper.child eq plan)
+    assert(wrapper.sourceIdentifyingName == Unassigned)
+  }
+
+  test("withUserProvidedName sets UserProvided name") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, Unassigned)
+    val named = wrapper.withUserProvidedName(Some("my_source"))
+
+    assert(named.sourceIdentifyingName == UserProvided("my_source"))
+  }
+
+  test("withUserProvidedName(None) returns same instance") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, Unassigned)
+    val result = wrapper.withUserProvidedName(None)
+
+    assert(result eq wrapper)
+  }
+
+  test("isStreaming returns true") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+    assert(wrapper.isStreaming)
+  }
+
+  test("has NAMED_STREAMING_RELATION tree pattern") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+    assert(wrapper.nodePatterns.contains(TreePattern.NAMED_STREAMING_RELATION))
+  }
+
+  test("resolved is false until unwrapped") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, UserProvided("test"))
+
+    // Even with a named child, wrapper stays unresolved
+    assert(!wrapper.resolved)
+  }
+
+  test("output delegates to child") {
+    val plan = createMockPlan()
+    val wrapper = NamedStreamingRelation(plan, Unassigned)
+
+    assert(wrapper.output == plan.output)
+  }
+
+  test("pattern matching on sourceIdentifyingName variants") {
+    val plan = createMockPlan()
+
+    val userProvided = NamedStreamingRelation(plan, UserProvided("test"))
+    val flowAssigned = NamedStreamingRelation(plan, FlowAssigned("0"))
+    val unassigned = NamedStreamingRelation(plan, Unassigned)
+
+    def extractName(wrapper: NamedStreamingRelation): Option[String] = {
+      wrapper.sourceIdentifyingName match {
+        case UserProvided(n) => Some(n)
+        case FlowAssigned(n) => Some(n)
+        case Unassigned => None
+      }
+    }
+
+    assert(extractName(userProvided).contains("test"))
+    assert(extractName(flowAssigned).contains("0"))
+    assert(extractName(unassigned).isEmpty)
+  }
+
+  test("toString includes sourceIdentifyingName") {
+    val plan = createMockPlan()
+
+    val userNamed = NamedStreamingRelation(plan, UserProvided("my_source"))
+    val flowNamed = NamedStreamingRelation(plan, FlowAssigned("0"))
+    val unnamed = NamedStreamingRelation(plan, Unassigned)
+
+    assert(userNamed.toString.contains("name=\"my_source\""))
+    assert(flowNamed.toString.contains("name=\"0\""))
+    assert(unnamed.toString.contains("name=<Unassigned>"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to