This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e5d23afad72 [SPARK-42056][SQL][PROTOBUF] Add missing options for
Protobuf functions
e5d23afad72 is described below
commit e5d23afad721cf6f3aa39880620954606b01bb38
Author: Raghu Angadi <[email protected]>
AuthorDate: Sat Jan 21 15:55:19 2023 +0900
[SPARK-42056][SQL][PROTOBUF] Add missing options for Protobuf functions
This adds missing options for Protobuf functions in both Scala & Python. We
should be able to pass options for both `from_protobuf()` and `to_protobuf()`.
This PR fixes various gaps:
- In Scala `to_protobuf()` didn't have a way to pass options.
- In Scala `from_protobuf()` that takes Java class name didn't allow
options.
- In Python, `from_protobuf()` that uses Java class name didn't propagate
options.
- In Python `to_protobuf()` didn't pass options.
### Why are the changes needed?
Options are important part of how Protobuf functions behave (e.g. we need
to set recursive depth). We need to be able to pass them.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Existing tests.
Closes #39550 from rangadi/protobuf-options.
Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/protobuf/CatalystDataToProtobuf.scala | 3 +-
.../org/apache/spark/sql/protobuf/functions.scala | 117 +++++++++++++++++----
python/pyspark/sql/protobuf/functions.py | 64 ++++++-----
3 files changed, 135 insertions(+), 49 deletions(-)
diff --git
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
index b9f7907ea8c..12561fe51e6 100644
---
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
+++
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{BinaryType, DataType}
private[protobuf] case class CatalystDataToProtobuf(
child: Expression,
messageName: String,
- descFilePath: Option[String] = None)
+ descFilePath: Option[String] = None,
+ options: Map[String, String] = Map.empty)
extends UnaryExpression {
override def dataType: DataType = BinaryType
diff --git
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
index b48817518d3..4c6e53f6020 100644
---
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
+++
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
@@ -27,16 +27,15 @@ object functions {
/**
* Converts a binary column of Protobuf format into its corresponding
catalyst value. The
- * specified schema must match actual schema of the read data, otherwise the
behavior is
- * undefined: it may fail or return arbitrary result. To deserialize the
data with a compatible
- * and evolved schema, the expected Protobuf schema can be set via the
option protoSchema.
+ * Protobuf definition is provided through Protobuf <i>descriptor file</i>.
*
* @param data
* the binary column.
* @param messageName
- * the protobuf message name to look for in descriptorFile.
+ * the protobuf message name to look for in descriptor file.
* @param descFilePath
- * the protobuf descriptor in Message GeneratedMessageV3 format.
+ * the protobuf descriptor file.
+ * @param options
* @since 3.4.0
*/
@Experimental
@@ -52,16 +51,14 @@ object functions {
/**
* Converts a binary column of Protobuf format into its corresponding
catalyst value. The
- * specified schema must match actual schema of the read data, otherwise the
behavior is
- * undefined: it may fail or return arbitrary result. To deserialize the
data with a compatible
- * and evolved schema, the expected Protobuf schema can be set via the
option protoSchema.
+ * Protobuf definition is provided through Protobuf <i>descriptor file</i>.
*
* @param data
* the binary column.
* @param messageName
- * the protobuf MessageName to look for in descriptorFile.
+ * the protobuf MessageName to look for in descriptor file.
* @param descFilePath
- * the protobuf descriptor in Message GeneratedMessageV3 format.
+ * the protobuf descriptor file.
* @since 3.4.0
*/
@Experimental
@@ -72,33 +69,59 @@ object functions {
}
/**
- * Converts a binary column of Protobuf format into its corresponding
catalyst value. The
- * specified Protobuf class must match the data, otherwise the behavior is
- * undefined: it may fail or return arbitrary result. The jar containing
Java class should be
+ * Converts a binary column of Protobuf format into its corresponding
catalyst value.
+ * `messageClassName` points to Protobuf Java class. The jar containing Java
class should be
* shaded. Specifically, `com.google.protobuf.*` should be shaded to
* `org.sparkproject.spark-protobuf.protobuf.*`.
+ * https://github.com/rangadi/shaded-protobuf-classes is useful to create
shaded jar from
+ * Protobuf files.
*
* @param data
* the binary column.
- * @param shadedMessageClassName
- * The Protobuf class name. E.g.
<code>org.spark.examples.protobuf.ExampleEvent</code>.
+ * @param messageClassName
+ * The full name for Protobuf Java class. E.g.
<code>com.example.protos.ExampleEvent</code>.
* The jar with these classes needs to be shaded as described above.
* @since 3.4.0
*/
@Experimental
- def from_protobuf(data: Column, shadedMessageClassName: String): Column = {
- new Column(ProtobufDataToCatalyst(data.expr, shadedMessageClassName))
+ def from_protobuf(data: Column, messageClassName: String): Column = {
+ new Column(ProtobufDataToCatalyst(data.expr, messageClassName))
}
/**
- * Converts a column into binary of protobuf format.
+ * Converts a binary column of Protobuf format into its corresponding
catalyst value.
+ * `messageClassName` points to Protobuf Java class. The jar containing Java
class should be
+ * shaded. Specifically, `com.google.protobuf.*` should be shaded to
+ * `org.sparkproject.spark-protobuf.protobuf.*`.
+ * https://github.com/rangadi/shaded-protobuf-classes is useful to create
shaded jar from
+ * Protobuf files.
+ *
+ * @param data
+ * the binary column.
+ * @param messageClassName
+ * The full name for Protobuf Java class. E.g.
<code>com.example.protos.ExampleEvent</code>.
+ * The jar with these classes needs to be shaded as described above.
+ * @param options
+ * @since 3.4.0
+ */
+ @Experimental
+ def from_protobuf(
+ data: Column,
+ messageClassName: String,
+ options: java.util.Map[String, String]): Column = {
+ new Column(ProtobufDataToCatalyst(data.expr, messageClassName, None,
options.asScala.toMap))
+ }
+
+ /**
+ * Converts a column into binary of protobuf format. The Protobuf definition
is provided
+ * through Protobuf <i>descriptor file</i>.
*
* @param data
* the data column.
* @param messageName
- * the protobuf MessageName to look for in descriptorFile.
+ * the protobuf MessageName to look for in descriptor file.
* @param descFilePath
- * the protobuf descriptor in Message GeneratedMessageV3 format.
+ * the protobuf descriptor file.
* @since 3.4.0
*/
@Experimental
@@ -106,17 +129,69 @@ object functions {
new Column(CatalystDataToProtobuf(data.expr, messageName,
Some(descFilePath)))
}
+ /**
+ * Converts a column into binary of protobuf format. The Protobuf definition
is provided
+ * through Protobuf <i>descriptor file</i>.
+ *
+ * @param data
+ * the data column.
+ * @param messageName
+ * the protobuf MessageName to look for in descriptor file.
+ * @param descFilePath
+ * the protobuf descriptor file.
+ * @param options
+ * @since 3.4.0
+ */
+ @Experimental
+ def to_protobuf(
+ data: Column,
+ messageName: String,
+ descFilePath: String,
+ options: java.util.Map[String, String]): Column = {
+ new Column(
+ CatalystDataToProtobuf(data.expr, messageName, Some(descFilePath),
options.asScala.toMap)
+ )
+ }
+
/**
* Converts a column into binary of protobuf format.
+ * `messageClassName` points to Protobuf Java class. The jar containing Java
class should be
+ * shaded. Specifically, `com.google.protobuf.*` should be shaded to
+ * `org.sparkproject.spark-protobuf.protobuf.*`.
+ * https://github.com/rangadi/shaded-protobuf-classes is useful to create
shaded jar from
+ * Protobuf files.
*
* @param data
* the data column.
* @param messageClassName
- * The Protobuf class name. E.g.
<code>org.spark.examples.protobuf.ExampleEvent</code>.
+ * The full name for Protobuf Java class. E.g.
<code>com.example.protos.ExampleEvent</code>.
+ * The jar with these classes needs to be shaded as described above.
* @since 3.4.0
*/
@Experimental
def to_protobuf(data: Column, messageClassName: String): Column = {
new Column(CatalystDataToProtobuf(data.expr, messageClassName))
}
+
+ /**
+ * Converts a column into binary of protobuf format.
+ * `messageClassName` points to Protobuf Java class. The jar containing Java
class should be
+ * shaded. Specifically, `com.google.protobuf.*` should be shaded to
+ * `org.sparkproject.spark-protobuf.protobuf.*`.
+ * https://github.com/rangadi/shaded-protobuf-classes is useful to create
shaded jar from
+ * Protobuf files.
+ *
+ * @param data
+ * the data column.
+ * @param messageClassName
+ * The full name for Protobuf Java class. E.g.
<code>com.example.protos.ExampleEvent</code>.
+ * The jar with these classes needs to be shaded as described above.
+ * @param options
+ * @since 3.4.0
+ */
+ @Experimental
+ def to_protobuf(data: Column, messageClassName: String, options:
java.util.Map[String, String])
+ : Column = {
+ new Column(CatalystDataToProtobuf(data.expr, messageClassName, None,
options.asScala.toMap))
+ }
}
diff --git a/python/pyspark/sql/protobuf/functions.py
b/python/pyspark/sql/protobuf/functions.py
index 1c17c37f715..52b806dc6a4 100644
--- a/python/pyspark/sql/protobuf/functions.py
+++ b/python/pyspark/sql/protobuf/functions.py
@@ -37,13 +37,15 @@ def from_protobuf(
) -> Column:
"""
Converts a binary column of Protobuf format into its corresponding
catalyst value.
- The specified schema must match the read data, otherwise the behavior is
undefined:
- it may fail or return arbitrary result. The jar containing Java class
should be shaded.
- Specifically, ``com.google.protobuf.*`` should be shaded to
- ``org.sparkproject.spark-protobuf.protobuf.*``.
+ The Protobuf definition is provided in one of these two ways:
- To deserialize the data with a compatible and evolved schema, the expected
- Protobuf schema can be set via the option protobuf descriptor.
+ - Protobuf descriptor file: E.g. a descriptor file created with
+ `protoc --include_imports --descriptor_set_out=abc.desc abc.proto`
+ - Jar containing Protobuf Java class: The jar containing Java class
should be shaded.
+ Specifically, `com.google.protobuf.*` should be shaded to
+ `org.sparkproject.spark-protobuf.protobuf.*`.
+ https://github.com/rangadi/shaded-protobuf-classes is useful to
create shaded jar from
+ Protobuf files. The jar file can be added with spark-submit option
--jars.
.. versionadded:: 3.4.0
@@ -52,12 +54,11 @@ def from_protobuf(
data : :class:`~pyspark.sql.Column` or str
the binary column.
messageName: str, optional
- the protobuf message name to look for in descriptor file. Or
- The Protobuf class name. E.g.
``org.spark.examples.protobuf.ExampleEvent``,
- without descFilePath parameter.
- Using the spark-submit option --jars, add a messageClassName specific
jar.
- descFilePath : str
- the protobuf descriptor in Message GeneratedMessageV3 format.
+ the protobuf message name to look for in descriptor file, or
+ The Protobuf class name when descFilePath parameter is not set.
+ E.g. `com.example.protos.ExampleEvent`.
+ descFilePath : str, optional
+ The protobuf descriptor file.
options : dict, optional
options to control how the protobuf record is parsed.
@@ -125,7 +126,7 @@ def from_protobuf(
)
else:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf(
- _to_java_column(data), messageName
+ _to_java_column(data), messageName, options or {}
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
@@ -135,13 +136,22 @@ def from_protobuf(
def to_protobuf(
- data: "ColumnOrName", messageName: str, descFilePath: Optional[str] = None
+ data: "ColumnOrName",
+ messageName: str,
+ descFilePath: Optional[str] = None,
+ options: Optional[Dict[str, str]] = None,
) -> Column:
"""
- Converts a column into binary of protobuf format. The specified Protobuf
class must match the
- data, otherwise the behavior is undefined: it may fail or return arbitrary
result. The jar
- containing Java class should be shaded. Specifically,
``com.google.protobuf.*`` should be
- shaded to ``org.sparkproject.spark-protobuf.protobuf.*``.
+ Converts a column into binary of protobuf format. The Protobuf definition
is provided in one
+ of these two ways:
+
+ - Protobuf descriptor file: E.g. a descriptor file created with
+ `protoc --include_imports --descriptor_set_out=abc.desc abc.proto`
+ - Jar containing Protobuf Java class: The jar containing Java class
should be shaded.
+ Specifically, `com.google.protobuf.*` should be shaded to
+ `org.sparkproject.spark-protobuf.protobuf.*`.
+ https://github.com/rangadi/shaded-protobuf-classes is useful to
create shaded jar from
+ Protobuf files. The jar file can be added with spark-submit option
--jars.
.. versionadded:: 3.4.0
@@ -150,16 +160,16 @@ def to_protobuf(
data : :class:`~pyspark.sql.Column` or str
the data column.
messageName: str, optional
- the protobuf message name to look for in descriptor file. Or
- The Protobuf class name. E.g.
``org.spark.examples.protobuf.ExampleEvent``,
- without descFilePath parameter.
- Using the spark-submit option --jars, add a messageClassName specific
jar.
- descFilePath : str
- the protobuf descriptor in Message GeneratedMessageV3 format.
+ the protobuf message name to look for in descriptor file, or
+ The Protobuf class name when descFilePath parameter is not set.
+ E.g. `com.example.protos.ExampleEvent`.
+ descFilePath : str, optional
+ the Protobuf descriptor file.
+ options : dict, optional
Notes
-----
- Protobuf functionality is provided as an pluggable external module
+ Protobuf functionality is provided as a pluggable external module
Examples
--------
@@ -207,11 +217,11 @@ def to_protobuf(
try:
if descFilePath is not None:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf(
- _to_java_column(data), messageName, descFilePath
+ _to_java_column(data), messageName, descFilePath, options or {}
)
else:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf(
- _to_java_column(data), messageName
+ _to_java_column(data), messageName, options or {}
)
except TypeError as e:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]