This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0191a5bde08 [SPARK-42690][CONNECT] Implement CSV/JSON parsing
functions for Scala client
0191a5bde08 is described below
commit 0191a5bde082c350b0eda07cf93953c497fd273b
Author: yangjie01 <[email protected]>
AuthorDate: Thu Mar 9 14:59:32 2023 +0800
[SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client
### What changes were proposed in this pull request?
This pr add a new proto message
```
message Parse {
// (Required) Input relation to Parse. The input is expected to have
single text column.
Relation input = 1;
// (Required) The expected format of the text.
ParseFormat format = 2;
// (Optional) DataType representing the schema. If not set, Spark will
infer the schema.
optional DataType schema = 3;
// Options for the csv/json parser. The map key is case insensitive.
map<string, string> options = 4;
enum ParseFormat {
PARSE_FORMAT_UNSPECIFIED = 0;
PARSE_FORMAT_CSV = 1;
PARSE_FORMAT_JSON = 2;
}
}
```
and implement CSV/JSON parsing functions for Scala client.
### Why are the changes needed?
Add Spark connect jvm client api coverage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass Github Actions
- Manual checked Scala 2.13
Closes #40332 from LuciferYang/SPARK-42690.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 07f71d2ba61325331aabbc686ce30cb9012a6643)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../org/apache/spark/sql/DataFrameReader.scala | 52 +++++
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 64 ++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 15 ++
.../CheckConnectJvmClientCompatibility.scala | 1 -
.../main/protobuf/spark/connect/relations.proto | 19 ++
.../explain-results/csv_from_dataset.explain | 1 +
.../explain-results/json_from_dataset.explain | 1 +
.../query-tests/queries/csv_from_dataset.json | 38 ++++
.../query-tests/queries/csv_from_dataset.proto.bin | Bin 0 -> 156 bytes
.../query-tests/queries/json_from_dataset.json | 38 ++++
.../queries/json_from_dataset.proto.bin | Bin 0 -> 167 bytes
.../sql/connect/planner/SparkConnectPlanner.scala | 26 +++
python/pyspark/sql/connect/proto/relations_pb2.py | 248 ++++++++++++---------
python/pyspark/sql/connect/proto/relations_pb2.pyi | 97 ++++++++
14 files changed, 491 insertions(+), 109 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index d5641fb303a..ad921bcc4e3 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -22,8 +22,10 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.types.StructType
/**
@@ -324,6 +326,20 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
format("json").load(paths: _*)
}
+ /**
+ * Loads a `Dataset[String]` storing JSON objects (<a
href="http://jsonlines.org/">JSON Lines
+ * text format or newline-delimited JSON</a>) and returns the result as a
`DataFrame`.
+ *
+ * Unless the schema is specified using `schema` function, this function
goes through the input
+ * once to determine the input schema.
+ *
+ * @param jsonDataset
+ * input Dataset with one JSON object per record
+ * @since 3.4.0
+ */
+ def json(jsonDataset: Dataset[String]): DataFrame =
+ parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON)
+
/**
* Loads a CSV file and returns the result as a `DataFrame`. See the
documentation on the other
* overloaded `csv()` method for more details.
@@ -351,6 +367,29 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
@scala.annotation.varargs
def csv(paths: String*): DataFrame = format("csv").load(paths: _*)
+ /**
+ * Loads an `Dataset[String]` storing CSV rows and returns the result as a
`DataFrame`.
+ *
+ * If the schema is not specified using `schema` function and `inferSchema`
option is enabled,
+ * this function goes through the input once to determine the input schema.
+ *
+ * If the schema is not specified using `schema` function and `inferSchema`
option is disabled,
+ * it determines the columns as string types and it reads only the first
line to determine the
+ * names and the number of fields.
+ *
+ * If the enforceSchema is set to `false`, only the CSV header in the first
line is checked to
+ * conform specified or inferred schema.
+ *
+ * @note
+ * if `header` option is set to `true` when calling this API, all lines
same with the header
+ * will be removed if exists.
+ * @param csvDataset
+ * input Dataset with one CSV row per record
+ * @since 3.4.0
+ */
+ def csv(csvDataset: Dataset[String]): DataFrame =
+ parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
+
/**
* Loads a Parquet file, returning the result as a `DataFrame`. See the
documentation on the
* other overloaded `parquet()` method for more details.
@@ -504,6 +543,19 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
}
}
+ private def parse(ds: Dataset[String], format: ParseFormat): DataFrame = {
+ sparkSession.newDataFrame { builder =>
+ val parseBuilder = builder.getParseBuilder
+ .setInput(ds.plan.getRoot)
+ .setFormat(format)
+ userSpecifiedSchema.foreach(schema =>
+
parseBuilder.setSchema(DataTypeProtoConverter.toConnectProtoType(schema)))
+ extraOptions.foreach { case (k, v) =>
+ parseBuilder.putOptions(k, v)
+ }
+ }
+ }
+
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 780280144b5..466a51841d4 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -27,6 +27,9 @@ import org.apache.commons.io.output.TeeOutputStream
import org.scalactic.TolerantNumerics
import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils,
RemoteSparkSession}
import org.apache.spark.sql.functions.{aggregate, array, broadcast, col,
count, lit, rand, sequence, shuffle, struct, transform, udf}
import org.apache.spark.sql.types._
@@ -644,6 +647,67 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper {
.collect()
assert(result sameElements expected)
}
+
+ test("json from Dataset[String] inferSchema") {
+ val session = spark
+ import session.implicits._
+ val expected = Seq(
+ new GenericRowWithSchema(
+ Array(73, "Shandong", "Kong"),
+ new StructType().add("age", LongType).add("city",
StringType).add("name", StringType)))
+ val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS()
+ val result = spark.read.option("allowSingleQuotes", "true").json(ds)
+ checkSameResult(expected, result)
+ }
+
+ test("json from Dataset[String] with schema") {
+ val session = spark
+ import session.implicits._
+ val schema = new StructType().add("city", StringType).add("name",
StringType)
+ val expected = Seq(new GenericRowWithSchema(Array("Shandong", "Kong"),
schema))
+ val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS()
+ val result = spark.read.schema(schema).option("allowSingleQuotes",
"true").json(ds)
+ checkSameResult(expected, result)
+ }
+
+ test("json from Dataset[String] with invalid schema") {
+ val message = intercept[ParseException] {
+
spark.read.schema("123").json(spark.createDataset(Seq.empty[String])(StringEncoder))
+ }.getMessage
+ assert(message.contains("PARSE_SYNTAX_ERROR"))
+ }
+
+ test("csv from Dataset[String] inferSchema") {
+ val session = spark
+ import session.implicits._
+ val expected = Seq(
+ new GenericRowWithSchema(
+ Array("Meng", 84, "Shandong"),
+ new StructType().add("name", StringType).add("age",
LongType).add("city", StringType)))
+ val ds = Seq("name,age,city", """"Meng",84,"Shandong"""").toDS()
+ val result = spark.read
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .csv(ds)
+ checkSameResult(expected, result)
+ }
+
+ test("csv from Dataset[String] with schema") {
+ val session = spark
+ import session.implicits._
+ val schema = new StructType().add("name", StringType).add("age", LongType)
+ val expected = Seq(new GenericRowWithSchema(Array("Meng", 84), schema))
+ val ds = Seq(""""Meng",84,"Shandong"""").toDS()
+ val result = spark.read.schema(schema).csv(ds)
+ checkSameResult(expected, result)
+ }
+
+ test("csv from Dataset[String] with invalid schema") {
+ val message = intercept[ParseException] {
+
spark.read.schema("123").csv(spark.createDataset(Seq.empty[String])(StringEncoder))
+ }.getMessage
+ assert(message.contains("PARSE_SYNTAX_ERROR"))
+ }
}
private[sql] case class MyType(id: Long, a: Double, b: Double)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 56c5111912a..0d295d17296 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{functions => fn}
import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.connect.client.SparkConnectClient
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
import org.apache.spark.sql.expressions.Window
@@ -254,6 +255,13 @@ class PlanGenerationTestSuite
session.read.json(testDataPath.resolve("people.json").toString)
}
+ test("json from dataset") {
+ session.read
+ .schema(new StructType().add("c1", StringType).add("c2", IntegerType))
+ .option("allowSingleQuotes", "true")
+ .json(session.emptyDataset(StringEncoder))
+ }
+
test("toJSON") {
complex.toJSON
}
@@ -262,6 +270,13 @@ class PlanGenerationTestSuite
session.read.csv(testDataPath.resolve("people.csv").toString)
}
+ test("csv from dataset") {
+ session.read
+ .schema(new StructType().add("c1", StringType).add("c2", IntegerType))
+ .option("header", "true")
+ .csv(session.emptyDataset(StringEncoder))
+ }
+
test("read parquet") {
session.read.parquet(testDataPath.resolve("users.parquet").toString)
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 868e7ae7b74..ae6c6c86fec 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -131,7 +131,6 @@ object CheckConnectJvmClientCompatibility {
// DataFrame Reader & Writer
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index ab67ade9fb7..97fc3a474f3 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -62,6 +62,7 @@ message Relation {
RepartitionByExpression repartition_by_expression = 27;
FrameMap frame_map = 28;
CollectMetrics collect_metrics = 29;
+ Parse parse = 30;
// NA functions
NAFill fill_na = 90;
@@ -798,3 +799,21 @@ message CollectMetrics {
// (Required) The metric sequence.
repeated Expression metrics = 3;
}
+
+message Parse {
+ // (Required) Input relation to Parse. The input is expected to have single
text column.
+ Relation input = 1;
+ // (Required) The expected format of the text.
+ ParseFormat format = 2;
+
+ // (Optional) DataType representing the schema. If not set, Spark will infer
the schema.
+ optional DataType schema = 3;
+
+ // Options for the csv/json parser. The map key is case insensitive.
+ map<string, string> options = 4;
+ enum ParseFormat {
+ PARSE_FORMAT_UNSPECIFIED = 0;
+ PARSE_FORMAT_CSV = 1;
+ PARSE_FORMAT_JSON = 2;
+ }
+}
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain
new file mode 100644
index 00000000000..9fbaa9fcede
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain
@@ -0,0 +1 @@
+LogicalRDD [c1#0, c2#0], false
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain
new file mode 100644
index 00000000000..9fbaa9fcede
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain
@@ -0,0 +1 @@
+LogicalRDD [c1#0, c2#0], false
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json
new file mode 100644
index 00000000000..d34fcb6f758
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json
@@ -0,0 +1,38 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "parse": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema":
"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
+ }
+ },
+ "format": "PARSE_FORMAT_CSV",
+ "schema": {
+ "struct": {
+ "fields": [{
+ "name": "c1",
+ "dataType": {
+ "string": {
+ }
+ },
+ "nullable": true
+ }, {
+ "name": "c2",
+ "dataType": {
+ "integer": {
+ }
+ },
+ "nullable": true
+ }]
+ }
+ },
+ "options": {
+ "header": "true"
+ }
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin
new file mode 100644
index 00000000000..5f8bd50685c
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin
differ
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json
new file mode 100644
index 00000000000..d6f992d09a5
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json
@@ -0,0 +1,38 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "parse": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema":
"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
+ }
+ },
+ "format": "PARSE_FORMAT_JSON",
+ "schema": {
+ "struct": {
+ "fields": [{
+ "name": "c1",
+ "dataType": {
+ "string": {
+ }
+ },
+ "nullable": true
+ }, {
+ "name": "c2",
+ "dataType": {
+ "integer": {
+ }
+ },
+ "nullable": true
+ }]
+ }
+ },
+ "options": {
+ "allowsinglequotes": "true"
+ }
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin
new file mode 100644
index 00000000000..0fce9d9ff8c
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin
differ
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b51dbfa6602..9a8402a1e98 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -29,6 +29,7 @@ import org.apache.spark.api.python.{PythonEvalType,
SimplePythonFunction}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanResponse, SqlCommand}
import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
+import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.sql.{Column, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier,
FunctionIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView,
MultiAlias, UnresolvedAlias, UnresolvedAttribute, UnresolvedExtractValue,
UnresolvedFunction, UnresolvedRegex, UnresolvedRelation, UnresolvedStar}
@@ -117,6 +118,7 @@ class SparkConnectPlanner(val session: SparkSession) {
transformFrameMap(rel.getFrameMap)
case proto.Relation.RelTypeCase.COLLECT_METRICS =>
transformCollectMetrics(rel.getCollectMetrics)
+ case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse)
case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
throw new IndexOutOfBoundsException("Expected Relation to be set, but
is empty.")
@@ -733,6 +735,30 @@ class SparkConnectPlanner(val session: SparkSession) {
}
}
+ private def transformParse(rel: proto.Parse): LogicalPlan = {
+ def dataFrameReader = {
+ val localMap =
CaseInsensitiveMap[String](rel.getOptionsMap.asScala.toMap)
+ val reader = session.read
+ if (rel.hasSchema) {
+ DataTypeProtoConverter.toCatalystType(rel.getSchema) match {
+ case s: StructType => reader.schema(s)
+ case other => throw InvalidPlanInput(s"Invalid schema dataType
$other")
+ }
+ }
+ localMap.foreach { case (key, value) => reader.option(key, value) }
+ reader
+ }
+ def ds: Dataset[String] = Dataset(session,
transformRelation(rel.getInput))(Encoders.STRING)
+
+ rel.getFormat match {
+ case ParseFormat.PARSE_FORMAT_CSV =>
+ dataFrameReader.csv(ds).queryExecution.analyzed
+ case ParseFormat.PARSE_FORMAT_JSON =>
+ dataFrameReader.json(ds).queryExecution.analyzed
+ case _ => throw InvalidPlanInput("Does not support " +
rel.getFormat.name())
+ }
+ }
+
private def transformFilter(rel: proto.Filter): LogicalPlan = {
assert(rel.hasInput)
val baseRel = transformRelation(rel.getInput)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py
b/python/pyspark/sql/connect/proto/relations_pb2.py
index e577749c3ed..81fa3916c5a 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as
spark_dot_connect_dot_catal
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xfb\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xa9\x13\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
)
@@ -93,9 +93,12 @@ _TOSCHEMA = DESCRIPTOR.message_types_by_name["ToSchema"]
_REPARTITIONBYEXPRESSION =
DESCRIPTOR.message_types_by_name["RepartitionByExpression"]
_FRAMEMAP = DESCRIPTOR.message_types_by_name["FrameMap"]
_COLLECTMETRICS = DESCRIPTOR.message_types_by_name["CollectMetrics"]
+_PARSE = DESCRIPTOR.message_types_by_name["Parse"]
+_PARSE_OPTIONSENTRY = _PARSE.nested_types_by_name["OptionsEntry"]
_JOIN_JOINTYPE = _JOIN.enum_types_by_name["JoinType"]
_SETOPERATION_SETOPTYPE = _SETOPERATION.enum_types_by_name["SetOpType"]
_AGGREGATE_GROUPTYPE = _AGGREGATE.enum_types_by_name["GroupType"]
+_PARSE_PARSEFORMAT = _PARSE.enum_types_by_name["ParseFormat"]
Relation = _reflection.GeneratedProtocolMessageType(
"Relation",
(_message.Message,),
@@ -648,6 +651,27 @@ CollectMetrics = _reflection.GeneratedProtocolMessageType(
)
_sym_db.RegisterMessage(CollectMetrics)
+Parse = _reflection.GeneratedProtocolMessageType(
+ "Parse",
+ (_message.Message,),
+ {
+ "OptionsEntry": _reflection.GeneratedProtocolMessageType(
+ "OptionsEntry",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _PARSE_OPTIONSENTRY,
+ "__module__": "spark.connect.relations_pb2"
+ #
@@protoc_insertion_point(class_scope:spark.connect.Parse.OptionsEntry)
+ },
+ ),
+ "DESCRIPTOR": _PARSE,
+ "__module__": "spark.connect.relations_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.Parse)
+ },
+)
+_sym_db.RegisterMessage(Parse)
+_sym_db.RegisterMessage(Parse.OptionsEntry)
+
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
@@ -658,112 +682,120 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001"
_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._options = None
_WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_options = b"8\001"
+ _PARSE_OPTIONSENTRY._options = None
+ _PARSE_OPTIONSENTRY._serialized_options = b"8\001"
_RELATION._serialized_start = 165
- _RELATION._serialized_end = 2592
- _UNKNOWN._serialized_start = 2594
- _UNKNOWN._serialized_end = 2603
- _RELATIONCOMMON._serialized_start = 2605
- _RELATIONCOMMON._serialized_end = 2696
- _SQL._serialized_start = 2699
- _SQL._serialized_end = 2833
- _SQL_ARGSENTRY._serialized_start = 2778
- _SQL_ARGSENTRY._serialized_end = 2833
- _READ._serialized_start = 2836
- _READ._serialized_end = 3332
- _READ_NAMEDTABLE._serialized_start = 2978
- _READ_NAMEDTABLE._serialized_end = 3039
- _READ_DATASOURCE._serialized_start = 3042
- _READ_DATASOURCE._serialized_end = 3319
- _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3239
- _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3297
- _PROJECT._serialized_start = 3334
- _PROJECT._serialized_end = 3451
- _FILTER._serialized_start = 3453
- _FILTER._serialized_end = 3565
- _JOIN._serialized_start = 3568
- _JOIN._serialized_end = 4039
- _JOIN_JOINTYPE._serialized_start = 3831
- _JOIN_JOINTYPE._serialized_end = 4039
- _SETOPERATION._serialized_start = 4042
- _SETOPERATION._serialized_end = 4521
- _SETOPERATION_SETOPTYPE._serialized_start = 4358
- _SETOPERATION_SETOPTYPE._serialized_end = 4472
- _LIMIT._serialized_start = 4523
- _LIMIT._serialized_end = 4599
- _OFFSET._serialized_start = 4601
- _OFFSET._serialized_end = 4680
- _TAIL._serialized_start = 4682
- _TAIL._serialized_end = 4757
- _AGGREGATE._serialized_start = 4760
- _AGGREGATE._serialized_end = 5342
- _AGGREGATE_PIVOT._serialized_start = 5099
- _AGGREGATE_PIVOT._serialized_end = 5210
- _AGGREGATE_GROUPTYPE._serialized_start = 5213
- _AGGREGATE_GROUPTYPE._serialized_end = 5342
- _SORT._serialized_start = 5345
- _SORT._serialized_end = 5505
- _DROP._serialized_start = 5508
- _DROP._serialized_end = 5649
- _DEDUPLICATE._serialized_start = 5652
- _DEDUPLICATE._serialized_end = 5823
- _LOCALRELATION._serialized_start = 5825
- _LOCALRELATION._serialized_end = 5914
- _SAMPLE._serialized_start = 5917
- _SAMPLE._serialized_end = 6190
- _RANGE._serialized_start = 6193
- _RANGE._serialized_end = 6338
- _SUBQUERYALIAS._serialized_start = 6340
- _SUBQUERYALIAS._serialized_end = 6454
- _REPARTITION._serialized_start = 6457
- _REPARTITION._serialized_end = 6599
- _SHOWSTRING._serialized_start = 6602
- _SHOWSTRING._serialized_end = 6744
- _STATSUMMARY._serialized_start = 6746
- _STATSUMMARY._serialized_end = 6838
- _STATDESCRIBE._serialized_start = 6840
- _STATDESCRIBE._serialized_end = 6921
- _STATCROSSTAB._serialized_start = 6923
- _STATCROSSTAB._serialized_end = 7024
- _STATCOV._serialized_start = 7026
- _STATCOV._serialized_end = 7122
- _STATCORR._serialized_start = 7125
- _STATCORR._serialized_end = 7262
- _STATAPPROXQUANTILE._serialized_start = 7265
- _STATAPPROXQUANTILE._serialized_end = 7429
- _STATFREQITEMS._serialized_start = 7431
- _STATFREQITEMS._serialized_end = 7556
- _STATSAMPLEBY._serialized_start = 7559
- _STATSAMPLEBY._serialized_end = 7868
- _STATSAMPLEBY_FRACTION._serialized_start = 7760
- _STATSAMPLEBY_FRACTION._serialized_end = 7859
- _NAFILL._serialized_start = 7871
- _NAFILL._serialized_end = 8005
- _NADROP._serialized_start = 8008
- _NADROP._serialized_end = 8142
- _NAREPLACE._serialized_start = 8145
- _NAREPLACE._serialized_end = 8441
- _NAREPLACE_REPLACEMENT._serialized_start = 8300
- _NAREPLACE_REPLACEMENT._serialized_end = 8441
- _TODF._serialized_start = 8443
- _TODF._serialized_end = 8531
- _WITHCOLUMNSRENAMED._serialized_start = 8534
- _WITHCOLUMNSRENAMED._serialized_end = 8773
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8706
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8773
- _WITHCOLUMNS._serialized_start = 8775
- _WITHCOLUMNS._serialized_end = 8894
- _HINT._serialized_start = 8897
- _HINT._serialized_end = 9029
- _UNPIVOT._serialized_start = 9032
- _UNPIVOT._serialized_end = 9359
- _UNPIVOT_VALUES._serialized_start = 9289
- _UNPIVOT_VALUES._serialized_end = 9348
- _TOSCHEMA._serialized_start = 9361
- _TOSCHEMA._serialized_end = 9467
- _REPARTITIONBYEXPRESSION._serialized_start = 9470
- _REPARTITIONBYEXPRESSION._serialized_end = 9673
- _FRAMEMAP._serialized_start = 9675
- _FRAMEMAP._serialized_end = 9800
- _COLLECTMETRICS._serialized_start = 9803
- _COLLECTMETRICS._serialized_end = 9939
+ _RELATION._serialized_end = 2638
+ _UNKNOWN._serialized_start = 2640
+ _UNKNOWN._serialized_end = 2649
+ _RELATIONCOMMON._serialized_start = 2651
+ _RELATIONCOMMON._serialized_end = 2742
+ _SQL._serialized_start = 2745
+ _SQL._serialized_end = 2879
+ _SQL_ARGSENTRY._serialized_start = 2824
+ _SQL_ARGSENTRY._serialized_end = 2879
+ _READ._serialized_start = 2882
+ _READ._serialized_end = 3378
+ _READ_NAMEDTABLE._serialized_start = 3024
+ _READ_NAMEDTABLE._serialized_end = 3085
+ _READ_DATASOURCE._serialized_start = 3088
+ _READ_DATASOURCE._serialized_end = 3365
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3285
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3343
+ _PROJECT._serialized_start = 3380
+ _PROJECT._serialized_end = 3497
+ _FILTER._serialized_start = 3499
+ _FILTER._serialized_end = 3611
+ _JOIN._serialized_start = 3614
+ _JOIN._serialized_end = 4085
+ _JOIN_JOINTYPE._serialized_start = 3877
+ _JOIN_JOINTYPE._serialized_end = 4085
+ _SETOPERATION._serialized_start = 4088
+ _SETOPERATION._serialized_end = 4567
+ _SETOPERATION_SETOPTYPE._serialized_start = 4404
+ _SETOPERATION_SETOPTYPE._serialized_end = 4518
+ _LIMIT._serialized_start = 4569
+ _LIMIT._serialized_end = 4645
+ _OFFSET._serialized_start = 4647
+ _OFFSET._serialized_end = 4726
+ _TAIL._serialized_start = 4728
+ _TAIL._serialized_end = 4803
+ _AGGREGATE._serialized_start = 4806
+ _AGGREGATE._serialized_end = 5388
+ _AGGREGATE_PIVOT._serialized_start = 5145
+ _AGGREGATE_PIVOT._serialized_end = 5256
+ _AGGREGATE_GROUPTYPE._serialized_start = 5259
+ _AGGREGATE_GROUPTYPE._serialized_end = 5388
+ _SORT._serialized_start = 5391
+ _SORT._serialized_end = 5551
+ _DROP._serialized_start = 5554
+ _DROP._serialized_end = 5695
+ _DEDUPLICATE._serialized_start = 5698
+ _DEDUPLICATE._serialized_end = 5869
+ _LOCALRELATION._serialized_start = 5871
+ _LOCALRELATION._serialized_end = 5960
+ _SAMPLE._serialized_start = 5963
+ _SAMPLE._serialized_end = 6236
+ _RANGE._serialized_start = 6239
+ _RANGE._serialized_end = 6384
+ _SUBQUERYALIAS._serialized_start = 6386
+ _SUBQUERYALIAS._serialized_end = 6500
+ _REPARTITION._serialized_start = 6503
+ _REPARTITION._serialized_end = 6645
+ _SHOWSTRING._serialized_start = 6648
+ _SHOWSTRING._serialized_end = 6790
+ _STATSUMMARY._serialized_start = 6792
+ _STATSUMMARY._serialized_end = 6884
+ _STATDESCRIBE._serialized_start = 6886
+ _STATDESCRIBE._serialized_end = 6967
+ _STATCROSSTAB._serialized_start = 6969
+ _STATCROSSTAB._serialized_end = 7070
+ _STATCOV._serialized_start = 7072
+ _STATCOV._serialized_end = 7168
+ _STATCORR._serialized_start = 7171
+ _STATCORR._serialized_end = 7308
+ _STATAPPROXQUANTILE._serialized_start = 7311
+ _STATAPPROXQUANTILE._serialized_end = 7475
+ _STATFREQITEMS._serialized_start = 7477
+ _STATFREQITEMS._serialized_end = 7602
+ _STATSAMPLEBY._serialized_start = 7605
+ _STATSAMPLEBY._serialized_end = 7914
+ _STATSAMPLEBY_FRACTION._serialized_start = 7806
+ _STATSAMPLEBY_FRACTION._serialized_end = 7905
+ _NAFILL._serialized_start = 7917
+ _NAFILL._serialized_end = 8051
+ _NADROP._serialized_start = 8054
+ _NADROP._serialized_end = 8188
+ _NAREPLACE._serialized_start = 8191
+ _NAREPLACE._serialized_end = 8487
+ _NAREPLACE_REPLACEMENT._serialized_start = 8346
+ _NAREPLACE_REPLACEMENT._serialized_end = 8487
+ _TODF._serialized_start = 8489
+ _TODF._serialized_end = 8577
+ _WITHCOLUMNSRENAMED._serialized_start = 8580
+ _WITHCOLUMNSRENAMED._serialized_end = 8819
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8752
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8819
+ _WITHCOLUMNS._serialized_start = 8821
+ _WITHCOLUMNS._serialized_end = 8940
+ _HINT._serialized_start = 8943
+ _HINT._serialized_end = 9075
+ _UNPIVOT._serialized_start = 9078
+ _UNPIVOT._serialized_end = 9405
+ _UNPIVOT_VALUES._serialized_start = 9335
+ _UNPIVOT_VALUES._serialized_end = 9394
+ _TOSCHEMA._serialized_start = 9407
+ _TOSCHEMA._serialized_end = 9513
+ _REPARTITIONBYEXPRESSION._serialized_start = 9516
+ _REPARTITIONBYEXPRESSION._serialized_end = 9719
+ _FRAMEMAP._serialized_start = 9721
+ _FRAMEMAP._serialized_end = 9846
+ _COLLECTMETRICS._serialized_start = 9849
+ _COLLECTMETRICS._serialized_end = 9985
+ _PARSE._serialized_start = 9988
+ _PARSE._serialized_end = 10376
+ _PARSE_OPTIONSENTRY._serialized_start = 3285
+ _PARSE_OPTIONSENTRY._serialized_end = 3343
+ _PARSE_PARSEFORMAT._serialized_start = 10277
+ _PARSE_PARSEFORMAT._serialized_end = 10365
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index d434451082e..edaab7bcb77 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -91,6 +91,7 @@ class Relation(google.protobuf.message.Message):
REPARTITION_BY_EXPRESSION_FIELD_NUMBER: builtins.int
FRAME_MAP_FIELD_NUMBER: builtins.int
COLLECT_METRICS_FIELD_NUMBER: builtins.int
+ PARSE_FIELD_NUMBER: builtins.int
FILL_NA_FIELD_NUMBER: builtins.int
DROP_NA_FIELD_NUMBER: builtins.int
REPLACE_FIELD_NUMBER: builtins.int
@@ -164,6 +165,8 @@ class Relation(google.protobuf.message.Message):
@property
def collect_metrics(self) -> global___CollectMetrics: ...
@property
+ def parse(self) -> global___Parse: ...
+ @property
def fill_na(self) -> global___NAFill:
"""NA functions"""
@property
@@ -229,6 +232,7 @@ class Relation(google.protobuf.message.Message):
repartition_by_expression: global___RepartitionByExpression | None =
...,
frame_map: global___FrameMap | None = ...,
collect_metrics: global___CollectMetrics | None = ...,
+ parse: global___Parse | None = ...,
fill_na: global___NAFill | None = ...,
drop_na: global___NADrop | None = ...,
replace: global___NAReplace | None = ...,
@@ -291,6 +295,8 @@ class Relation(google.protobuf.message.Message):
b"local_relation",
"offset",
b"offset",
+ "parse",
+ b"parse",
"project",
b"project",
"range",
@@ -384,6 +390,8 @@ class Relation(google.protobuf.message.Message):
b"local_relation",
"offset",
b"offset",
+ "parse",
+ b"parse",
"project",
b"project",
"range",
@@ -461,6 +469,7 @@ class Relation(google.protobuf.message.Message):
"repartition_by_expression",
"frame_map",
"collect_metrics",
+ "parse",
"fill_na",
"drop_na",
"replace",
@@ -2763,3 +2772,91 @@ class CollectMetrics(google.protobuf.message.Message):
) -> None: ...
global___CollectMetrics = CollectMetrics
+
+class Parse(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class _ParseFormat:
+ ValueType = typing.NewType("ValueType", builtins.int)
+ V: typing_extensions.TypeAlias = ValueType
+
+ class _ParseFormatEnumTypeWrapper(
+
google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Parse._ParseFormat.ValueType],
+ builtins.type,
+ ): # noqa: F821
+ DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
+ PARSE_FORMAT_UNSPECIFIED: Parse._ParseFormat.ValueType # 0
+ PARSE_FORMAT_CSV: Parse._ParseFormat.ValueType # 1
+ PARSE_FORMAT_JSON: Parse._ParseFormat.ValueType # 2
+
+ class ParseFormat(_ParseFormat, metaclass=_ParseFormatEnumTypeWrapper): ...
+ PARSE_FORMAT_UNSPECIFIED: Parse.ParseFormat.ValueType # 0
+ PARSE_FORMAT_CSV: Parse.ParseFormat.ValueType # 1
+ PARSE_FORMAT_JSON: Parse.ParseFormat.ValueType # 2
+
+ class OptionsEntry(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ KEY_FIELD_NUMBER: builtins.int
+ VALUE_FIELD_NUMBER: builtins.int
+ key: builtins.str
+ value: builtins.str
+ def __init__(
+ self,
+ *,
+ key: builtins.str = ...,
+ value: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["key", b"key",
"value", b"value"]
+ ) -> None: ...
+
+ INPUT_FIELD_NUMBER: builtins.int
+ FORMAT_FIELD_NUMBER: builtins.int
+ SCHEMA_FIELD_NUMBER: builtins.int
+ OPTIONS_FIELD_NUMBER: builtins.int
+ @property
+ def input(self) -> global___Relation:
+ """(Required) Input relation to Parse. The input is expected to have
single text column."""
+ format: global___Parse.ParseFormat.ValueType
+ """(Required) The expected format of the text."""
+ @property
+ def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType:
+ """(Optional) DataType representing the schema. If not set, Spark will
infer the schema."""
+ @property
+ def options(self) ->
google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
+ """Options for the csv/json parser. The map key is case insensitive."""
+ def __init__(
+ self,
+ *,
+ input: global___Relation | None = ...,
+ format: global___Parse.ParseFormat.ValueType = ...,
+ schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+ options: collections.abc.Mapping[builtins.str, builtins.str] | None =
...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_schema", b"_schema", "input", b"input", "schema", b"schema"
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_schema",
+ b"_schema",
+ "format",
+ b"format",
+ "input",
+ b"input",
+ "options",
+ b"options",
+ "schema",
+ b"schema",
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
+ ) -> typing_extensions.Literal["schema"] | None: ...
+
+global___Parse = Parse
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]