This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 418cf56ed4dc [SPARK-53796][SDP] Add `extension` field to a few 
pipeline protos to support forward compatibility
418cf56ed4dc is described below

commit 418cf56ed4dc440b36ebab37d2aac898278215a8
Author: Yuheng Chang <[email protected]>
AuthorDate: Sat Oct 11 08:05:30 2025 -0700

    [SPARK-53796][SDP] Add `extension` field to a few pipeline protos to 
support forward compatibility
    
    ### What changes were proposed in this pull request?
    
    Adding `google.protobuf.Any extension = 999;` field to `PipelineCommand` 
and `SourceCodeLocation` Protos to support forward-compatibility by carrying 
additional pipeline command types, source code location fields that are not yet 
defined in this version of the proto.
    
    During the planning stage, the Spark Server will resolve and dispatch 
command / message to the correct handler.
    
    ### Why are the changes needed?
    
    To support forward-compatibility by carrying additional pipeline command 
types / dataset or flow's fields that are not yet defined in this version of 
the proto. Useful for platforms that want to extend pipeline commands with 
platform-specific capabilities.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test will be added for feature works using this new field
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52514 from SCHJonathan/jonathan-chang_data/add-extension.
    
    Authored-by: Yuheng Chang <[email protected]>
    Signed-off-by: Sandy Ryza <[email protected]>
---
 python/pyspark/sql/connect/proto/pipelines_pb2.py  | 104 ++++++++++-----------
 python/pyspark/sql/connect/proto/pipelines_pb2.pyi |  29 ++++++
 .../main/protobuf/spark/connect/pipelines.proto    |  11 +++
 3 files changed, 92 insertions(+), 52 deletions(-)

diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py 
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index b2240e398ced..44a6bdbb2280 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xe5!\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
 )
 
 _globals = globals()
@@ -69,60 +69,60 @@ if not _descriptor._USE_C_DESCRIPTORS:
     ]._serialized_options = b"8\001"
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
     _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = 
b"8\001"
-    _globals["_OUTPUTTYPE"]._serialized_start = 5665
-    _globals["_OUTPUTTYPE"]._serialized_end = 5770
+    _globals["_OUTPUTTYPE"]._serialized_start = 5774
+    _globals["_OUTPUTTYPE"]._serialized_end = 5879
     _globals["_PIPELINECOMMAND"]._serialized_start = 195
-    _globals["_PIPELINECOMMAND"]._serialized_end = 4520
-    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1074
-    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1382
-    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start 
= 1283
-    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 
1341
-    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1384
-    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1474
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1477
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2728
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start = 
2013
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end = 
2414
+    _globals["_PIPELINECOMMAND"]._serialized_end = 4575
+    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
+    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437
+    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start 
= 1338
+    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 
1396
+    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1439
+    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1529
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1532
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2783
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start = 
2068
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end = 
2469
     _globals[
         "_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
-    ]._serialized_start = 2327
+    ]._serialized_start = 2382
     _globals[
         "_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
-    ]._serialized_end = 2393
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start = 
2417
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end = 
2626
-    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
 = 2557
-    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
 = 2615
-    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2731
-    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3592
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 
1283
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1341
-    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
 = 3325
-    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
 = 3422
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3424
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3482
-    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3595
-    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3917
-    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 
3920
-    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4119
-    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 4122
-    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 4280
-    
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
4283
-    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 4504
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4523
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5275
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 4892
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
4990
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
4993
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5126
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5129
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5260
-    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5277
-    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5350
-    _globals["_PIPELINEEVENT"]._serialized_start = 5352
-    _globals["_PIPELINEEVENT"]._serialized_end = 5468
-    _globals["_SOURCECODELOCATION"]._serialized_start = 5470
-    _globals["_SOURCECODELOCATION"]._serialized_end = 5592
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5594
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5663
+    ]._serialized_end = 2448
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start = 
2472
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end = 
2681
+    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
 = 2612
+    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
 = 2670
+    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2786
+    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3647
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 
1338
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
 = 3380
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
 = 3477
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3479
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3537
+    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3650
+    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3972
+    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 
3975
+    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4174
+    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 4177
+    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 4335
+    
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
4338
+    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 4559
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4578
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5330
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 4947
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
5045
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
5048
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5181
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5184
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5315
+    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5332
+    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5405
+    _globals["_PIPELINEEVENT"]._serialized_start = 5407
+    _globals["_PIPELINEEVENT"]._serialized_end = 5523
+    _globals["_SOURCECODELOCATION"]._serialized_start = 5526
+    _globals["_SOURCECODELOCATION"]._serialized_end = 5701
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5703
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5772
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi 
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index 4da07a68d200..a2714b86af94 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -1021,6 +1021,7 @@ class PipelineCommand(google.protobuf.message.Message):
     DEFINE_SQL_GRAPH_ELEMENTS_FIELD_NUMBER: builtins.int
     GET_QUERY_FUNCTION_EXECUTION_SIGNAL_STREAM_FIELD_NUMBER: builtins.int
     DEFINE_FLOW_QUERY_FUNCTION_RESULT_FIELD_NUMBER: builtins.int
+    EXTENSION_FIELD_NUMBER: builtins.int
     @property
     def create_dataflow_graph(self) -> 
global___PipelineCommand.CreateDataflowGraph: ...
     @property
@@ -1041,6 +1042,13 @@ class PipelineCommand(google.protobuf.message.Message):
     def define_flow_query_function_result(
         self,
     ) -> global___PipelineCommand.DefineFlowQueryFunctionResult: ...
+    @property
+    def extension(self) -> google.protobuf.any_pb2.Any:
+        """Reserved field for protocol extensions.
+        Used to support forward-compatibility by carrying additional command 
types
+        that are not yet defined in this version of the proto. During 
planning, the
+        engine will resolve and dispatch the concrete command contained in 
this field.
+        """
     def __init__(
         self,
         *,
@@ -1054,6 +1062,7 @@ class PipelineCommand(google.protobuf.message.Message):
         | None = ...,
         define_flow_query_function_result: 
global___PipelineCommand.DefineFlowQueryFunctionResult
         | None = ...,
+        extension: google.protobuf.any_pb2.Any | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -1072,6 +1081,8 @@ class PipelineCommand(google.protobuf.message.Message):
             b"define_sql_graph_elements",
             "drop_dataflow_graph",
             b"drop_dataflow_graph",
+            "extension",
+            b"extension",
             "get_query_function_execution_signal_stream",
             b"get_query_function_execution_signal_stream",
             "start_run",
@@ -1095,6 +1106,8 @@ class PipelineCommand(google.protobuf.message.Message):
             b"define_sql_graph_elements",
             "drop_dataflow_graph",
             b"drop_dataflow_graph",
+            "extension",
+            b"extension",
             "get_query_function_execution_signal_stream",
             b"get_query_function_execution_signal_stream",
             "start_run",
@@ -1113,6 +1126,7 @@ class PipelineCommand(google.protobuf.message.Message):
             "define_sql_graph_elements",
             "get_query_function_execution_signal_stream",
             "define_flow_query_function_result",
+            "extension",
         ]
         | None
     ): ...
@@ -1347,15 +1361,28 @@ class 
SourceCodeLocation(google.protobuf.message.Message):
 
     FILE_NAME_FIELD_NUMBER: builtins.int
     LINE_NUMBER_FIELD_NUMBER: builtins.int
+    EXTENSION_FIELD_NUMBER: builtins.int
     file_name: builtins.str
     """The file that this pipeline source code was defined in."""
     line_number: builtins.int
     """The specific line number that this pipeline source code is located at, 
if applicable."""
+    @property
+    def extension(
+        self,
+    ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+        google.protobuf.any_pb2.Any
+    ]:
+        """Reserved field for protocol extensions.
+        Used to support forward-compatibility by carrying additional fields
+        that are not yet defined in this version of the proto. During 
planning, the
+        engine will resolve and dispatch the concrete command contained in 
this field.
+        """
     def __init__(
         self,
         *,
         file_name: builtins.str | None = ...,
         line_number: builtins.int | None = ...,
+        extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] | 
None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -1377,6 +1404,8 @@ class SourceCodeLocation(google.protobuf.message.Message):
             b"_file_name",
             "_line_number",
             b"_line_number",
+            "extension",
+            b"extension",
             "file_name",
             b"file_name",
             "line_number",
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 3cb555db497f..41939ec2548f 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -40,6 +40,11 @@ message PipelineCommand {
     DefineSqlGraphElements define_sql_graph_elements = 6;
     GetQueryFunctionExecutionSignalStream 
get_query_function_execution_signal_stream = 7;
     DefineFlowQueryFunctionResult define_flow_query_function_result = 8;
+    // Reserved field for protocol extensions.
+    // Used to support forward-compatibility by carrying additional command 
types
+    // that are not yet defined in this version of the proto. During planning, 
the
+    // engine will resolve and dispatch the concrete command contained in this 
field.
+    google.protobuf.Any extension = 999;
   }
 
   // Request to create a new dataflow graph.
@@ -262,6 +267,12 @@ message SourceCodeLocation {
   optional string file_name = 1;
   // The specific line number that this pipeline source code is located at, if 
applicable.
   optional int32 line_number = 2;
+
+  // Reserved field for protocol extensions.
+  // Used to support forward-compatibility by carrying additional fields
+  // that are not yet defined in this version of the proto. During planning, 
the
+  // engine will resolve and dispatch the concrete command contained in this 
field.
+  repeated google.protobuf.Any extension = 999;
 }
 
 // A signal from the server to the client to execute the query function for 
one or more flows, and


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to