This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ff9d41abaff [SPARK-43799][PYTHON] Add descriptor binary option to
Pyspark Protobuf API
ff9d41abaff is described below
commit ff9d41abaffcbd6f0c26ce5be9d2324fe9f01d5c
Author: Raghu Angadi <[email protected]>
AuthorDate: Tue May 30 09:01:32 2023 +0900
[SPARK-43799][PYTHON] Add descriptor binary option to Pyspark Protobuf API
### What changes were proposed in this pull request?
This updated Protobuf Pyspark API to allow passing binary FileDescriptorSet
rather than a file name. This is a Python follow up to feature implemented in
Scala in #41192.
### Why are the changes needed?
- This allows flexibility for Pyspark users to provide binary descriptor
set directly.
- Even if users are using file path, Pyspark avoids passing file name to
Scala and reads the descriptor file in Python. This avoids having to read the
file in Scala.
### Does this PR introduce _any_ user-facing change?
- This adds extra arg to `from_protobuf()` and `to_protobuf()` API.
### How was this patch tested?
- Doc tests
- Manual tests
Closes #41343 from rangadi/py-proto-file-buffer.
Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/protobuf/functions.py | 74 +++++++++++++++++++++++++++-----
1 file changed, 64 insertions(+), 10 deletions(-)
diff --git a/python/pyspark/sql/protobuf/functions.py
b/python/pyspark/sql/protobuf/functions.py
index a303cf91493..42165938eb7 100644
--- a/python/pyspark/sql/protobuf/functions.py
+++ b/python/pyspark/sql/protobuf/functions.py
@@ -37,13 +37,17 @@ def from_protobuf(
messageName: str,
descFilePath: Optional[str] = None,
options: Optional[Dict[str, str]] = None,
+ binaryDescriptorSet: Optional[bytes] = None,
) -> Column:
"""
Converts a binary column of Protobuf format into its corresponding
catalyst value.
- The Protobuf definition is provided in one of these two ways:
+ The Protobuf definition is provided in one of these ways:
- Protobuf descriptor file: E.g. a descriptor file created with
`protoc --include_imports --descriptor_set_out=abc.desc abc.proto`
+ - Protobuf descriptor as binary: Rather than file path as in previous
option,
+ we can provide the binary content of the file. This allows
flexibility in how the
+ descriptor set is created and fetched.
- Jar containing Protobuf Java class: The jar containing Java class
should be shaded.
Specifically, `com.google.protobuf.*` should be shaded to
`org.sparkproject.spark_protobuf.protobuf.*`.
@@ -52,6 +56,9 @@ def from_protobuf(
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.5.0
+ Supports `binaryDescriptorSet` arg to pass binary descriptor directly.
+
Parameters
----------
data : :class:`~pyspark.sql.Column` or str
@@ -61,9 +68,11 @@ def from_protobuf(
The Protobuf class name when descFilePath parameter is not set.
E.g. `com.example.protos.ExampleEvent`.
descFilePath : str, optional
- The protobuf descriptor file.
+ The Protobuf descriptor file.
options : dict, optional
options to control how the protobuf record is parsed.
+ binaryDescriptorSet: bytes, optional
+ The Protobuf `FileDescriptorSet` serialized as binary.
Notes
-----
@@ -92,9 +101,14 @@ def from_protobuf(
... proto_df = df.select(
... to_protobuf(df.value, message_name,
desc_file_path).alias("value"))
... proto_df.show(truncate=False)
- ... proto_df = proto_df.select(
+ ... proto_df_1 = proto_df.select( # With file name for descriptor
... from_protobuf(proto_df.value, message_name,
desc_file_path).alias("value"))
- ... proto_df.show(truncate=False)
+ ... proto_df_1.show(truncate=False)
+ ... proto_df_2 = proto_df.select( # With binary for descriptor
+ ... from_protobuf(proto_df.value, message_name,
+ ... binaryDescriptorSet =
bytearray.fromhex(desc_hex))
+ ... .alias("value"))
+ ... proto_df_2.show(truncate=False)
+----------------------------------------+
|value |
+----------------------------------------+
@@ -105,6 +119,11 @@ def from_protobuf(
+------------------+
|{2, Alice, 109200}|
+------------------+
+ +------------------+
+ |value |
+ +------------------+
+ |{2, Alice, 109200}|
+ +------------------+
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
@@ -122,9 +141,14 @@ def from_protobuf(
sc = get_active_spark_context()
try:
- if descFilePath is not None:
+ binary_proto = None
+ if binaryDescriptorSet is not None:
+ binary_proto = binaryDescriptorSet
+ elif descFilePath is not None:
+ binary_proto = _read_descriptor_set_file(descFilePath)
+ if binary_proto is not None:
jc = cast(JVMView,
sc._jvm).org.apache.spark.sql.protobuf.functions.from_protobuf(
- _to_java_column(data), messageName, descFilePath, options or {}
+ _to_java_column(data), messageName, binary_proto, options or {}
)
else:
jc = cast(JVMView,
sc._jvm).org.apache.spark.sql.protobuf.functions.from_protobuf(
@@ -142,13 +166,17 @@ def to_protobuf(
messageName: str,
descFilePath: Optional[str] = None,
options: Optional[Dict[str, str]] = None,
+ binaryDescriptorSet: Optional[bytes] = None,
) -> Column:
"""
Converts a column into binary of protobuf format. The Protobuf definition
is provided in one
- of these two ways:
+ of these ways:
- Protobuf descriptor file: E.g. a descriptor file created with
`protoc --include_imports --descriptor_set_out=abc.desc abc.proto`
+ - Protobuf descriptor as binary: Rather than file path as in previous
option,
+ we can provide the binary content of the file. This allows
flexibility in how the
+ descriptor set is created and fetched.
- Jar containing Protobuf Java class: The jar containing Java class
should be shaded.
Specifically, `com.google.protobuf.*` should be shaded to
`org.sparkproject.spark_protobuf.protobuf.*`.
@@ -157,6 +185,9 @@ def to_protobuf(
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.5.0
+ Supports `binaryDescriptorSet` arg to pass binary descriptor directly.
+
Parameters
----------
data : :class:`~pyspark.sql.Column` or str
@@ -168,6 +199,8 @@ def to_protobuf(
descFilePath : str, optional
the Protobuf descriptor file.
options : dict, optional
+ binaryDescriptorSet: bytes, optional
+ The Protobuf `FileDescriptorSet` serialized as binary.
Notes
-----
@@ -193,9 +226,19 @@ def to_protobuf(
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
- ... proto_df = df.select(
+ ... proto_df = df.select( # With file name for descriptor
... to_protobuf(df.value, message_name,
desc_file_path).alias("suite"))
... proto_df.show(truncate=False)
+ ... proto_df_2 = df.select( # With binary for descriptor
+ ... to_protobuf(df.value, message_name,
+ ...
binaryDescriptorSet=bytearray.fromhex(desc_hex))
+ ... .alias("suite"))
+ ... proto_df_2.show(truncate=False)
+ +-------------------------------------------+
+ |suite |
+ +-------------------------------------------+
+ |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+ +-------------------------------------------+
+-------------------------------------------+
|suite |
+-------------------------------------------+
@@ -216,9 +259,14 @@ def to_protobuf(
sc = get_active_spark_context()
try:
- if descFilePath is not None:
+ binary_proto = None
+ if binaryDescriptorSet is not None:
+ binary_proto = binaryDescriptorSet
+ elif descFilePath is not None:
+ binary_proto = _read_descriptor_set_file(descFilePath)
+ if binary_proto is not None:
jc = cast(JVMView,
sc._jvm).org.apache.spark.sql.protobuf.functions.to_protobuf(
- _to_java_column(data), messageName, descFilePath, options or {}
+ _to_java_column(data), messageName, binary_proto, options or {}
)
else:
jc = cast(JVMView,
sc._jvm).org.apache.spark.sql.protobuf.functions.to_protobuf(
@@ -232,6 +280,12 @@ def to_protobuf(
return Column(jc)
+def _read_descriptor_set_file(filePath: str) -> bytes:
+ # TODO(SPARK-43847): Throw structured errors like
"PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" etc.
+ with open(filePath, "rb") as f:
+ return f.read()
+
+
def _test() -> None:
import os
import sys
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]