This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c707f597e9c4 Add PipelineAnalysisContext message to support pipeline
analysis during Spark Connect query execution
c707f597e9c4 is described below
commit c707f597e9c4fa60cfac754bfb6b27d2b9eadbec
Author: Jessie Luo <[email protected]>
AuthorDate: Wed Oct 22 20:09:33 2025 -0700
Add PipelineAnalysisContext message to support pipeline analysis during
Spark Connect query execution
### What changes were proposed in this pull request?
This PR introduces a new protobuf message, PipelineAnalysisContext, in
ect/common/src/main/protobuf/spark/connect/pipelines.proto.
### Why are the changes needed?
Special handling is needed for spark.sql in certain contexts. This proto
provides a foundation for passing such context in future.
### Does this PR introduce _any_ user-facing change?
No, it only adds an internal protobuf message.
### How was this patch tested?
Verified through protobuf compilation and existing test coverage.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52685 from cookiedough77/jessie.luo-data/add-analysis-context-proto.
Authored-by: Jessie Luo <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
python/pyspark/sql/connect/proto/pipelines_pb2.py | 14 ++--
python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 81 ++++++++++++++++++++++
.../main/protobuf/spark/connect/pipelines.proto | 13 ++++
3 files changed, 102 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index befeb48310cf..f3489f55ed87 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -42,7 +42,7 @@ from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutp [...]
)
_globals = globals()
@@ -69,8 +69,8 @@ if not _descriptor._USE_C_DESCRIPTORS:
]._serialized_options = b"8\001"
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options =
b"8\001"
- _globals["_OUTPUTTYPE"]._serialized_start = 5774
- _globals["_OUTPUTTYPE"]._serialized_end = 5879
+ _globals["_OUTPUTTYPE"]._serialized_start = 6058
+ _globals["_OUTPUTTYPE"]._serialized_end = 6163
_globals["_PIPELINECOMMAND"]._serialized_start = 195
_globals["_PIPELINECOMMAND"]._serialized_end = 4575
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129
@@ -122,7 +122,9 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_PIPELINEEVENT"]._serialized_start = 5407
_globals["_PIPELINEEVENT"]._serialized_end = 5523
_globals["_SOURCECODELOCATION"]._serialized_start = 5526
- _globals["_SOURCECODELOCATION"]._serialized_end = 5701
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5703
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5772
+ _globals["_SOURCECODELOCATION"]._serialized_end = 5767
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5769
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5838
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5841
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6056
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index a2714b86af94..b9170e763ed9 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -1361,11 +1361,14 @@ class
SourceCodeLocation(google.protobuf.message.Message):
FILE_NAME_FIELD_NUMBER: builtins.int
LINE_NUMBER_FIELD_NUMBER: builtins.int
+ DEFINITION_PATH_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
file_name: builtins.str
"""The file that this pipeline source code was defined in."""
line_number: builtins.int
"""The specific line number that this pipeline source code is located at,
if applicable."""
+ definition_path: builtins.str
+ """The path of the top-level pipeline file determined at runtime during
pipeline initialization."""
@property
def extension(
self,
@@ -1382,15 +1385,20 @@ class
SourceCodeLocation(google.protobuf.message.Message):
*,
file_name: builtins.str | None = ...,
line_number: builtins.int | None = ...,
+ definition_path: builtins.str | None = ...,
extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
+ "_definition_path",
+ b"_definition_path",
"_file_name",
b"_file_name",
"_line_number",
b"_line_number",
+ "definition_path",
+ b"definition_path",
"file_name",
b"file_name",
"line_number",
@@ -1400,10 +1408,14 @@ class
SourceCodeLocation(google.protobuf.message.Message):
def ClearField(
self,
field_name: typing_extensions.Literal[
+ "_definition_path",
+ b"_definition_path",
"_file_name",
b"_file_name",
"_line_number",
b"_line_number",
+ "definition_path",
+ b"definition_path",
"extension",
b"extension",
"file_name",
@@ -1413,6 +1425,10 @@ class
SourceCodeLocation(google.protobuf.message.Message):
],
) -> None: ...
@typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_definition_path",
b"_definition_path"]
+ ) -> typing_extensions.Literal["definition_path"] | None: ...
+ @typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_file_name",
b"_file_name"]
) -> typing_extensions.Literal["file_name"] | None: ...
@@ -1445,3 +1461,68 @@ class
PipelineQueryFunctionExecutionSignal(google.protobuf.message.Message):
) -> None: ...
global___PipelineQueryFunctionExecutionSignal =
PipelineQueryFunctionExecutionSignal
+
+class PipelineAnalysisContext(google.protobuf.message.Message):
+ """Metadata providing context about the pipeline during Spark Connect
query analysis."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
+ DEFINITION_PATH_FIELD_NUMBER: builtins.int
+ EXTENSION_FIELD_NUMBER: builtins.int
+ dataflow_graph_id: builtins.str
+ """Unique identifier of the dataflow graph associated with this
pipeline."""
+ definition_path: builtins.str
+ """The path of the top-level pipeline file determined at runtime during
pipeline initialization."""
+ @property
+ def extension(
+ self,
+ ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Reserved field for protocol extensions."""
+ def __init__(
+ self,
+ *,
+ dataflow_graph_id: builtins.str | None = ...,
+ definition_path: builtins.str | None = ...,
+ extension: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_dataflow_graph_id",
+ b"_dataflow_graph_id",
+ "_definition_path",
+ b"_definition_path",
+ "dataflow_graph_id",
+ b"dataflow_graph_id",
+ "definition_path",
+ b"definition_path",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_dataflow_graph_id",
+ b"_dataflow_graph_id",
+ "_definition_path",
+ b"_definition_path",
+ "dataflow_graph_id",
+ b"dataflow_graph_id",
+ "definition_path",
+ b"definition_path",
+ "extension",
+ b"extension",
+ ],
+ ) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_dataflow_graph_id",
b"_dataflow_graph_id"]
+ ) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_definition_path",
b"_definition_path"]
+ ) -> typing_extensions.Literal["definition_path"] | None: ...
+
+global___PipelineAnalysisContext = PipelineAnalysisContext
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 41939ec2548f..c6a5e571f979 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -267,6 +267,8 @@ message SourceCodeLocation {
optional string file_name = 1;
// The specific line number that this pipeline source code is located at, if
applicable.
optional int32 line_number = 2;
+ // The path of the top-level pipeline file determined at runtime during
pipeline initialization.
+ optional string definition_path = 3;
// Reserved field for protocol extensions.
// Used to support forward-compatibility by carrying additional fields
@@ -280,3 +282,14 @@ message SourceCodeLocation {
message PipelineQueryFunctionExecutionSignal {
repeated string flow_names = 1;
}
+
+// Metadata providing context about the pipeline during Spark Connect query
analysis.
+message PipelineAnalysisContext {
+ // Unique identifier of the dataflow graph associated with this pipeline.
+ optional string dataflow_graph_id = 1;
+ // The path of the top-level pipeline file determined at runtime during
pipeline initialization.
+ optional string definition_path = 2;
+
+ // Reserved field for protocol extensions.
+ repeated google.protobuf.Any extension = 999;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]