Repository: spark Updated Branches: refs/heads/branch-2.0 f6d7292d1 -> 4ccc5643f
[SPARK-15051][SQL] Create a TypedColumn alias ## What changes were proposed in this pull request? Currently when we create an alias against a TypedColumn from user-defined Aggregator(for example: agg(aggSum.toColumn as "a")), spark is using the alias' function from Column( as), the alias function will return a column contains a TypedAggregateExpression, which is unresolved because the inputDeserializer is not defined. Later the aggregator function (agg) will inject the inputDeserializer back to the TypedAggregateExpression, but only if the aggregate columns are TypedColumn, in the above case, the TypedAggregateExpression will remain unresolved because it is under column and caused the problem reported by this jira [15051](https://issues.apache.org/jira/browse/SPARK-15051?jql=project%20%3D%20SPARK). This PR propose to create an alias function for TypedColumn, it will return a TypedColumn. It is using the similar code path as Column's alia function. For the spark build in aggregate function, like max, it is working with alias, for example val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") checkAnswer(df1.agg(max("j") as "b"), Row(3) :: Nil) Thanks for comments. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Add test cases in DatasetAggregatorSuite.scala run the sql related queries against this patch. Author: Kevin Yu <[email protected]> Closes #12893 from kevinyu98/spark-15051. (cherry picked from commit 607a27a0d149be049091bcf274a73b8476b36c90) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ccc5643 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ccc5643 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ccc5643 Branch: refs/heads/branch-2.0 Commit: 4ccc5643f90133f5c80514915fd5616c77837f10 Parents: f6d7292 Author: Kevin Yu <[email protected]> Authored: Sat May 7 11:13:48 2016 +0800 Committer: Wenchen Fan <[email protected]> Committed: Sat May 7 11:14:02 2016 +0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/Column.scala | 19 +++++++++++++------ .../spark/sql/DatasetAggregatorSuite.scala | 8 ++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4ccc5643/sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index c58adda..9b8334d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -68,6 +68,18 @@ class TypedColumn[-T, U]( } new TypedColumn[T, U](newExpr, encoder) } + + /** + * Gives the TypedColumn a name (alias). + * If the current TypedColumn has metadata associated with it, this metadata will be propagated + * to the new column. + * + * @group expr_ops + * @since 2.0.0 + */ + override def name(alias: String): TypedColumn[T, U] = + new TypedColumn[T, U](super.name(alias).expr, encoder) + } /** @@ -910,12 +922,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.3.0 */ - def as(alias: Symbol): Column = withExpr { - expr match { - case ne: NamedExpression => Alias(expr, alias.name)(explicitMetadata = Some(ne.metadata)) - case other => Alias(other, alias.name)() - } - } + def as(alias: Symbol): Column = name(alias.name) /** * Gives the column an alias with metadata. http://git-wip-us.apache.org/repos/asf/spark/blob/4ccc5643/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 6eae3ed..b2a0f3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -232,4 +232,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { "a" -> Seq(1, 2) ) } + + test("spark-15051 alias of aggregator in DataFrame/Dataset[Row]") { + val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") + checkAnswer(df1.agg(RowAgg.toColumn as "b"), Row(6) :: Nil) + + val df2 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j") + checkAnswer(df2.agg(RowAgg.toColumn as "b").select("b"), Row(6) :: Nil) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
