Repository: spark
Updated Branches:
  refs/heads/master a21a3bbe6 -> 607a27a0d


[SPARK-15051][SQL] Create a TypedColumn alias

## What changes were proposed in this pull request?

Currently when we create an alias against a TypedColumn from user-defined 
Aggregator(for example: agg(aggSum.toColumn as "a")), spark is using the alias' 
function from Column( as), the alias function will return a column contains a 
TypedAggregateExpression, which is unresolved because the inputDeserializer is 
not defined. Later the aggregator function (agg) will inject the 
inputDeserializer back to the TypedAggregateExpression, but only if the 
aggregate columns are TypedColumn, in the above case, the 
TypedAggregateExpression will remain unresolved because it is under column and 
caused the
problem reported by this jira 
[15051](https://issues.apache.org/jira/browse/SPARK-15051?jql=project%20%3D%20SPARK).

This PR propose to create an alias function for TypedColumn,  it will return a 
TypedColumn. It is using the similar code path  as Column's alia function.

For the spark build in aggregate function, like max, it is working with alias, 
for example

val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
checkAnswer(df1.agg(max("j") as "b"), Row(3) :: Nil)

Thanks for comments.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

Add test cases in DatasetAggregatorSuite.scala
run the sql related queries against this patch.

Author: Kevin Yu <[email protected]>

Closes #12893 from kevinyu98/spark-15051.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607a27a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607a27a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607a27a0

Branch: refs/heads/master
Commit: 607a27a0d149be049091bcf274a73b8476b36c90
Parents: a21a3bb
Author: Kevin Yu <[email protected]>
Authored: Sat May 7 11:13:48 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Sat May 7 11:13:48 2016 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/Column.scala | 19 +++++++++++++------
 .../spark/sql/DatasetAggregatorSuite.scala       |  8 ++++++++
 2 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/607a27a0/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index c58adda..9b8334d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -68,6 +68,18 @@ class TypedColumn[-T, U](
     }
     new TypedColumn[T, U](newExpr, encoder)
   }
+
+  /**
+   * Gives the TypedColumn a name (alias).
+   * If the current TypedColumn has metadata associated with it, this metadata 
will be propagated
+   * to the new column.
+   *
+   * @group expr_ops
+   * @since 2.0.0
+   */
+  override def name(alias: String): TypedColumn[T, U] =
+    new TypedColumn[T, U](super.name(alias).expr, encoder)
+
 }
 
 /**
@@ -910,12 +922,7 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
    * @group expr_ops
    * @since 1.3.0
    */
-  def as(alias: Symbol): Column = withExpr {
-    expr match {
-      case ne: NamedExpression => Alias(expr, alias.name)(explicitMetadata = 
Some(ne.metadata))
-      case other => Alias(other, alias.name)()
-    }
-  }
+  def as(alias: Symbol): Column = name(alias.name)
 
   /**
    * Gives the column an alias with metadata.

http://git-wip-us.apache.org/repos/asf/spark/blob/607a27a0/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 6eae3ed..b2a0f3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -232,4 +232,12 @@ class DatasetAggregatorSuite extends QueryTest with 
SharedSQLContext {
       "a" -> Seq(1, 2)
     )
   }
+
+  test("spark-15051 alias of aggregator in DataFrame/Dataset[Row]") {
+    val df1 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
+    checkAnswer(df1.agg(RowAgg.toColumn as "b"), Row(6) :: Nil)
+
+    val df2 = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
+    checkAnswer(df2.agg(RowAgg.toColumn as "b").select("b"), Row(6) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to