This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c707f597e9c4 Add PipelineAnalysisContext message to support pipeline 
analysis during Spark Connect query execution
c707f597e9c4 is described below

commit c707f597e9c4fa60cfac754bfb6b27d2b9eadbec
Author: Jessie Luo <[email protected]>
AuthorDate: Wed Oct 22 20:09:33 2025 -0700

    Add PipelineAnalysisContext message to support pipeline analysis during 
Spark Connect query execution
    
    ### What changes were proposed in this pull request?
    This PR introduces a new protobuf message, PipelineAnalysisContext, in 
ect/common/src/main/protobuf/spark/connect/pipelines.proto.
    
    ### Why are the changes needed?
    Special handling is needed for spark.sql in certain contexts. This proto 
provides a foundation for passing such context in future.
    
    ### Does this PR introduce _any_ user-facing change?
    No, it only adds an internal protobuf message.
    
    ### How was this patch tested?
    Verified through protobuf compilation and existing test coverage.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52685 from cookiedough77/jessie.luo-data/add-analysis-context-proto.
    
    Authored-by: Jessie Luo <[email protected]>
    Signed-off-by: Sandy Ryza <[email protected]>
---
 python/pyspark/sql/connect/proto/pipelines_pb2.py  | 14 ++--
 python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 81 ++++++++++++++++++++++
 .../main/protobuf/spark/connect/pipelines.proto    | 13 ++++
 3 files changed, 102 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py 
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index befeb48310cf..f3489f55ed87 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
 )
 
 _globals = globals()
@@ -69,8 +69,8 @@ if not _descriptor._USE_C_DESCRIPTORS:
     ]._serialized_options = b"8\001"
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = 
b"8\001"
-    _globals["_OUTPUTTYPE"]._serialized_start = 5774
-    _globals["_OUTPUTTYPE"]._serialized_end = 5879
+    _globals["_OUTPUTTYPE"]._serialized_start = 6058
+    _globals["_OUTPUTTYPE"]._serialized_end = 6163
     _globals["_PIPELINECOMMAND"]._serialized_start = 195
     _globals["_PIPELINECOMMAND"]._serialized_end = 4575
     _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
@@ -122,7 +122,9 @@ if not _descriptor._USE_C_DESCRIPTORS:
     _globals["_PIPELINEEVENT"]._serialized_start = 5407
     _globals["_PIPELINEEVENT"]._serialized_end = 5523
     _globals["_SOURCECODELOCATION"]._serialized_start = 5526
-    _globals["_SOURCECODELOCATION"]._serialized_end = 5701
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5703
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5772
+    _globals["_SOURCECODELOCATION"]._serialized_end = 5767
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5769
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5838
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5841
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6056
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi 
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index a2714b86af94..b9170e763ed9 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -1361,11 +1361,14 @@ class 
SourceCodeLocation(google.protobuf.message.Message):
 
     FILE_NAME_FIELD_NUMBER: builtins.int
     LINE_NUMBER_FIELD_NUMBER: builtins.int
+    DEFINITION_PATH_FIELD_NUMBER: builtins.int
     EXTENSION_FIELD_NUMBER: builtins.int
     file_name: builtins.str
     """The file that this pipeline source code was defined in."""
     line_number: builtins.int
     """The specific line number that this pipeline source code is located at, 
if applicable."""
+    definition_path: builtins.str
+    """The path of the top-level pipeline file determined at runtime during 
pipeline initialization."""
     @property
     def extension(
         self,
@@ -1382,15 +1385,20 @@ class 
SourceCodeLocation(google.protobuf.message.Message):
         *,
         file_name: builtins.str | None = ...,
         line_number: builtins.int | None = ...,
+        definition_path: builtins.str | None = ...,
         extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] | 
None = ...,
     ) -> None: ...
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "_definition_path",
+            b"_definition_path",
             "_file_name",
             b"_file_name",
             "_line_number",
             b"_line_number",
+            "definition_path",
+            b"definition_path",
             "file_name",
             b"file_name",
             "line_number",
@@ -1400,10 +1408,14 @@ class 
SourceCodeLocation(google.protobuf.message.Message):
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "_definition_path",
+            b"_definition_path",
             "_file_name",
             b"_file_name",
             "_line_number",
             b"_line_number",
+            "definition_path",
+            b"definition_path",
             "extension",
             b"extension",
             "file_name",
@@ -1413,6 +1425,10 @@ class 
SourceCodeLocation(google.protobuf.message.Message):
         ],
     ) -> None: ...
     @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_definition_path", 
b"_definition_path"]
+    ) -> typing_extensions.Literal["definition_path"] | None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_file_name", 
b"_file_name"]
     ) -> typing_extensions.Literal["file_name"] | None: ...
@@ -1445,3 +1461,68 @@ class 
PipelineQueryFunctionExecutionSignal(google.protobuf.message.Message):
     ) -> None: ...
 
 global___PipelineQueryFunctionExecutionSignal = 
PipelineQueryFunctionExecutionSignal
+
+class PipelineAnalysisContext(google.protobuf.message.Message):
+    """Metadata providing context about the pipeline during Spark Connect 
query analysis."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
+    DEFINITION_PATH_FIELD_NUMBER: builtins.int
+    EXTENSION_FIELD_NUMBER: builtins.int
+    dataflow_graph_id: builtins.str
+    """Unique identifier of the dataflow graph associated with this 
pipeline."""
+    definition_path: builtins.str
+    """The path of the top-level pipeline file determined at runtime during 
pipeline initialization."""
+    @property
+    def extension(
+        self,
+    ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+        google.protobuf.any_pb2.Any
+    ]:
+        """Reserved field for protocol extensions."""
+    def __init__(
+        self,
+        *,
+        dataflow_graph_id: builtins.str | None = ...,
+        definition_path: builtins.str | None = ...,
+        extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] | 
None = ...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_dataflow_graph_id",
+            b"_dataflow_graph_id",
+            "_definition_path",
+            b"_definition_path",
+            "dataflow_graph_id",
+            b"dataflow_graph_id",
+            "definition_path",
+            b"definition_path",
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_dataflow_graph_id",
+            b"_dataflow_graph_id",
+            "_definition_path",
+            b"_definition_path",
+            "dataflow_graph_id",
+            b"dataflow_graph_id",
+            "definition_path",
+            b"definition_path",
+            "extension",
+            b"extension",
+        ],
+    ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_dataflow_graph_id", 
b"_dataflow_graph_id"]
+    ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_definition_path", 
b"_definition_path"]
+    ) -> typing_extensions.Literal["definition_path"] | None: ...
+
+global___PipelineAnalysisContext = PipelineAnalysisContext
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 41939ec2548f..c6a5e571f979 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -267,6 +267,8 @@ message SourceCodeLocation {
   optional string file_name = 1;
   // The specific line number that this pipeline source code is located at, if 
applicable.
   optional int32 line_number = 2;
+  // The path of the top-level pipeline file determined at runtime during 
pipeline initialization.
+  optional string definition_path = 3;
 
   // Reserved field for protocol extensions.
   // Used to support forward-compatibility by carrying additional fields
@@ -280,3 +282,14 @@ message SourceCodeLocation {
 message PipelineQueryFunctionExecutionSignal {
   repeated string flow_names = 1;
 }
+
+// Metadata providing context about the pipeline during Spark Connect query 
analysis.
+message PipelineAnalysisContext {
+  // Unique identifier of the dataflow graph associated with this pipeline.
+  optional string dataflow_graph_id = 1;
+  // The path of the top-level pipeline file determined at runtime during 
pipeline initialization.
+  optional string definition_path = 2;
+
+  // Reserved field for protocol extensions.
+  repeated google.protobuf.Any extension = 999;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to