This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 01dc5e6abaf [SPARK-42786][CONNECT] Typed Select
01dc5e6abaf is described below
commit 01dc5e6abafeb0bee2049d3e9da73fb89703d958
Author: Zhen Li <[email protected]>
AuthorDate: Tue Mar 21 23:34:08 2023 -0400
[SPARK-42786][CONNECT] Typed Select
### What changes were proposed in this pull request?
Implement typed select methods in the Dataset.
### Why are the changes needed?
More APIs
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit and E2E tests
Closes #40413 from zhenlineo/select-typed.
Authored-by: Zhen Li <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 79 ++++++++++++++++++--
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 31 +++++++-
.../apache/spark/sql/PlanGenerationTestSuite.scala | 35 +++++++++
.../CheckConnectJvmClientCompatibility.scala | 3 +-
.../explain-results/select_typed_2-arg.explain | 2 +
.../explain-results/select_typed_3-arg.explain | 2 +
.../explain-results/select_typed_4-arg.explain | 2 +
.../explain-results/select_typed_5-arg.explain | 2 +
.../query-tests/queries/select_typed_2-arg.json | 42 +++++++++++
.../queries/select_typed_2-arg.proto.bin | Bin 0 -> 101 bytes
.../query-tests/queries/select_typed_3-arg.json | 55 ++++++++++++++
.../queries/select_typed_3-arg.proto.bin | Bin 0 -> 128 bytes
.../query-tests/queries/select_typed_4-arg.json | 68 +++++++++++++++++
.../queries/select_typed_4-arg.proto.bin | Bin 0 -> 157 bytes
.../query-tests/queries/select_typed_5-arg.json | 81 +++++++++++++++++++++
.../queries/select_typed_5-arg.proto.bin | Bin 0 -> 182 bytes
.../sql/catalyst/encoders/AgnosticEncoder.scala | 14 ++++
17 files changed, 407 insertions(+), 9 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index fdc994b2d90..cc9f66a8ba0 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.connect.proto
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
-import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder,
StringEncoder, UnboundRowEncoder}
+import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder,
ProductEncoder, StringEncoder, UnboundRowEncoder}
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.connect.client.SparkResult
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
@@ -1020,11 +1020,8 @@ class Dataset[T] private[sql] (
* @since 3.4.0
*/
@scala.annotation.varargs
- def select(cols: Column*): DataFrame = sparkSession.newDataFrame { builder =>
- builder.getProjectBuilder
- .setInput(plan.getRoot)
- .addAllExpressions(cols.map(_.expr).asJava)
- }
+ def select(cols: Column*): DataFrame =
+ selectUntyped(UnboundRowEncoder, cols).asInstanceOf[DataFrame]
/**
* Selects a set of columns. This is a variant of `select` that can only
select existing columns
@@ -1084,6 +1081,76 @@ class Dataset[T] private[sql] (
}
}
+ /**
+ * Internal helper function for building typed selects that return tuples.
For simplicity and
+ * code reuse, we do this without the help of the type system and then use
helper functions that
+ * cast appropriately for the user facing interface.
+ */
+ private def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
+ val encoder = ProductEncoder.tuple(columns.map(_.encoder))
+ selectUntyped(encoder, columns)
+ }
+
+ /**
+ * Internal helper function for all select methods. The only difference
between the select
+ * methods and typed select methods is the encoder used to build the return
dataset.
+ */
+ private def selectUntyped(encoder: AgnosticEncoder[_], cols: Seq[Column]):
Dataset[_] = {
+ sparkSession.newDataset(encoder) { builder =>
+ builder.getProjectBuilder
+ .setInput(plan.getRoot)
+ .addAllExpressions(cols.map(_.expr).asJava)
+ }
+ }
+
+ /**
+ * Returns a new Dataset by computing the given [[Column]] expressions for
each element.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]):
Dataset[(U1, U2)] =
+ selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
+
+ /**
+ * Returns a new Dataset by computing the given [[Column]] expressions for
each element.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def select[U1, U2, U3](
+ c1: TypedColumn[T, U1],
+ c2: TypedColumn[T, U2],
+ c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] =
+ selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]]
+
+ /**
+ * Returns a new Dataset by computing the given [[Column]] expressions for
each element.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def select[U1, U2, U3, U4](
+ c1: TypedColumn[T, U1],
+ c2: TypedColumn[T, U2],
+ c3: TypedColumn[T, U3],
+ c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] =
+ selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]]
+
+ /**
+ * Returns a new Dataset by computing the given [[Column]] expressions for
each element.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def select[U1, U2, U3, U4, U5](
+ c1: TypedColumn[T, U1],
+ c2: TypedColumn[T, U2],
+ c3: TypedColumn[T, U3],
+ c4: TypedColumn[T, U4],
+ c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
+ selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4,
U5)]]
+
/**
* Filters rows using the given condition.
* {{{
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 605b15123c6..0e5784326ad 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils,
RemoteSparkSession}
-import org.apache.spark.sql.functions.{aggregate, array, broadcast, col,
count, lit, rand, sequence, shuffle, struct, transform, udf}
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
@@ -486,6 +486,19 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper {
}
}
+ private def validateMyTypeResult(result: Array[(MyType, MyType, MyType)]):
Unit = {
+ result.zipWithIndex.foreach { case (row, i) =>
+ val t1 = row._1
+ val t2 = row._2
+ val t3 = row._3
+ assert(t1 === t2)
+ assert(t2 === t3)
+ assert(t1.id == i)
+ assert(t1.a == t1.id % 2)
+ assert(t1.b == t1.id / 10.0d)
+ }
+ }
+
test("Dataset collect complex type") {
val session = spark
import session.implicits._
@@ -502,6 +515,12 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper {
assert(numRows === 1000)
}
+ test("Dataset typed select - multiple columns") {
+ val result = spark.range(1000).select(count("id"), sum("id")).first()
+ assert(result.getLong(0) === 1000)
+ assert(result.getLong(1) === 499500)
+ }
+
test("Dataset typed select - complex column") {
val session = spark
import session.implicits._
@@ -511,6 +530,16 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper {
validateMyTypeResult(ds.collect())
}
+ test("Dataset typed select - multiple complex columns") {
+ val session = spark
+ import session.implicits._
+ val s = struct(generateMyTypeColumns: _*).as[MyType]
+ val ds = session
+ .range(3)
+ .select(s, s, s)
+ validateMyTypeResult(ds.collect())
+ }
+
test("lambda functions") {
// This test is mostly to validate lambda variables are properly resolved.
val result = spark
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 95d6fddc97c..cb62977d937 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -307,6 +307,41 @@ class PlanGenerationTestSuite
simple.select(fn.struct(fn.col("id"), fn.col("a")).as(encoder))
}
+ test("select typed 2-arg") {
+ val encoder = ScalaReflection.encoderFor[(Long, Int)]
+ val encoder2 = ScalaReflection.encoderFor[(Double, Double)]
+ val col = fn.struct(fn.col("id"), fn.col("a"))
+ val col2 = fn.struct(fn.col("a"), fn.col("b"))
+ simple.select(col.as(encoder), col2.as(encoder2))
+ }
+
+ test("select typed 3-arg") {
+ val encoder = ScalaReflection.encoderFor[(Long, Int)]
+ val encoder2 = ScalaReflection.encoderFor[(Double, Double)]
+ val col = fn.struct(fn.col("id"), fn.col("a"))
+ val col2 = fn.struct(fn.col("a"), fn.col("b"))
+ simple.select(col.as(encoder), col2.as(encoder2), col.as(encoder))
+ }
+
+ test("select typed 4-arg") {
+ val encoder = ScalaReflection.encoderFor[(Long, Int)]
+ val col = fn.struct(fn.col("id"), fn.col("a"))
+ simple.select(col.as(encoder), col.as(encoder), col.as(encoder),
col.as(encoder))
+ }
+
+ test("select typed 5-arg") {
+ val encoder = ScalaReflection.encoderFor[(Long, Int)]
+ val encoder2 = ScalaReflection.encoderFor[(Double, Double)]
+ val col = fn.struct(fn.col("id"), fn.col("a"))
+ val col2 = fn.struct(fn.col("a"), fn.col("b"))
+ simple.select(
+ col.as(encoder),
+ col2.as(encoder2),
+ col.as(encoder),
+ col.as(encoder),
+ col2.as(encoder2))
+ }
+
test("limit") {
simple.limit(10)
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a2b4762f0a9..f62841c05cb 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -150,8 +150,7 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
+
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
// protected
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.reduce"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupByKey"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.explode"),
// deprecated
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_2-arg.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_2-arg.explain
new file mode 100644
index 00000000000..324519c6853
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_2-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0)
AS struct(a, b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_3-arg.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_3-arg.explain
new file mode 100644
index 00000000000..403c9346c17
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_3-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0)
AS struct(a, b)#0, struct(id, id#0L, a, a#0) AS struct(id, a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_4-arg.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_4-arg.explain
new file mode 100644
index 00000000000..714ed57fff1
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_4-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(id, id#0L, a,
a#0) AS struct(id, a)#0, struct(id, id#0L, a, a#0) AS struct(id, a)#0,
struct(id, id#0L, a, a#0) AS struct(id, a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_5-arg.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_5-arg.explain
new file mode 100644
index 00000000000..b11439319a1
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_5-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0)
AS struct(a, b)#0, struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(id,
id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0) AS struct(a, b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.json
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.json
new file mode 100644
index 00000000000..c9c6c752356
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.json
@@ -0,0 +1,42 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.proto.bin
new file mode 100644
index 00000000000..37f3915cd8d
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.proto.bin
differ
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.json
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.json
new file mode 100644
index 00000000000..23850dcb136
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.json
@@ -0,0 +1,55 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.proto.bin
new file mode 100644
index 00000000000..b3b56953a85
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.proto.bin
differ
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.json
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.json
new file mode 100644
index 00000000000..2bbdb60794d
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.json
@@ -0,0 +1,68 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.proto.bin
new file mode 100644
index 00000000000..bacccff22ae
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.proto.bin
differ
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.json
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.json
new file mode 100644
index 00000000000..4f57c0ef821
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.json
@@ -0,0 +1,81 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }, {
+ "unresolvedFunction": {
+ "functionName": "struct",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.proto.bin
new file mode 100644
index 00000000000..2c51e208888
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.proto.bin
differ
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
index 24c8bad5c2f..6599916ec7f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
@@ -19,12 +19,14 @@ package org.apache.spark.sql.catalyst.encoders
import java.{sql => jsql}
import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInt}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
+import java.util.concurrent.ConcurrentHashMap
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.Utils
/**
* A non implementation specific encoder. This encoder containers all the
information needed
@@ -107,6 +109,18 @@ object AgnosticEncoders {
override def dataType: DataType = schema
}
+ object ProductEncoder {
+ val cachedCls = new ConcurrentHashMap[Int, Class[_]]
+ def tuple(encoders: Seq[AgnosticEncoder[_]]): AgnosticEncoder[_] = {
+ val fields = encoders.zipWithIndex.map {
+ case (e, id) => EncoderField(s"_${id + 1}", e, e.nullable,
Metadata.empty)
+ }
+ val cls = cachedCls.computeIfAbsent(encoders.size,
+ _ =>
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}"))
+ ProductEncoder[Any](ClassTag(cls), fields)
+ }
+ }
+
abstract class BaseRowEncoder extends AgnosticEncoder[Row] {
override def isPrimitive: Boolean = false
override def dataType: DataType = schema
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]