Repository: spark
Updated Branches:
refs/heads/branch-2.0 beb753004 -> 7e2bfff20
[SPARK-15807][SQL] Support varargs for dropDuplicates in Dataset/DataFrame
## What changes were proposed in this pull request?
This PR adds `varargs`-types `dropDuplicates` functions in `Dataset/DataFrame`.
Currently, `dropDuplicates` supports only `Seq` or `Array`.
**Before**
```scala
scala> val ds = spark.createDataFrame(Seq(("a", 1), ("b", 2), ("a", 2)))
ds: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> ds.dropDuplicates(Seq("_1", "_2"))
res0: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: string, _2:
int]
scala> ds.dropDuplicates("_1", "_2")
<console>:26: error: overloaded method value dropDuplicates with alternatives:
(colNames:
Array[String])org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] <and>
(colNames: Seq[String])org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
<and>
()org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
cannot be applied to (String, String)
ds.dropDuplicates("_1", "_2")
^
```
**After**
```scala
scala> val ds = spark.createDataFrame(Seq(("a", 1), ("b", 2), ("a", 2)))
ds: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> ds.dropDuplicates("_1", "_2")
res0: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: string, _2:
int]
```
## How was this patch tested?
Pass the Jenkins tests with new testcases.
Author: Dongjoon Hyun <[email protected]>
Closes #13545 from dongjoon-hyun/SPARK-15807.
(cherry picked from commit 3fd2ff4dd85633af49865456a52bf0c09c99708b)
Signed-off-by: Reynold Xin <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e2bfff2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e2bfff2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e2bfff2
Branch: refs/heads/branch-2.0
Commit: 7e2bfff20c7278a20dca857cfd452b96d4d97c1a
Parents: beb7530
Author: Dongjoon Hyun <[email protected]>
Authored: Sat Jun 11 15:47:51 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Jun 11 15:47:57 2016 -0700
----------------------------------------------------------------------
.../src/main/scala/org/apache/spark/sql/Dataset.scala | 13 +++++++++++++
.../scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++++
.../test/scala/org/apache/spark/sql/DatasetSuite.scala | 13 +++++++++++++
3 files changed, 30 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7e2bfff2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 16bbf30..5a67fc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1834,6 +1834,19 @@ class Dataset[T] private[sql](
def dropDuplicates(colNames: Array[String]): Dataset[T] =
dropDuplicates(colNames.toSeq)
/**
+ * Returns a new [[Dataset]] with duplicate rows removed, considering only
+ * the subset of columns.
+ *
+ * @group typedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
+ val colNames: Seq[String] = col1 +: cols
+ dropDuplicates(colNames)
+ }
+
+ /**
* Computes statistics for numeric columns, including count, mean, stddev,
min, and max.
* If no columns are given, this function computes statistics for all
numerical columns.
*
http://git-wip-us.apache.org/repos/asf/spark/blob/7e2bfff2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a02e48d..6bb0ce9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -906,6 +906,10 @@ class DataFrameSuite extends QueryTest with
SharedSQLContext {
checkAnswer(
testData.dropDuplicates(Seq("value2")),
Seq(Row(2, 1, 2), Row(1, 1, 1)))
+
+ checkAnswer(
+ testData.dropDuplicates("key", "value1"),
+ Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2)))
}
test("SPARK-7150 range api") {
http://git-wip-us.apache.org/repos/asf/spark/blob/7e2bfff2/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 11b52bd..4536a73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -806,6 +806,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext
{
assert(e.getMessage.contains("Null value appeared in non-nullable field"))
assert(e.getMessage.contains("top level non-flat input object"))
}
+
+ test("dropDuplicates") {
+ val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+ checkDataset(
+ ds.dropDuplicates("_1"),
+ ("a", 1), ("b", 1))
+ checkDataset(
+ ds.dropDuplicates("_2"),
+ ("a", 1), ("a", 2))
+ checkDataset(
+ ds.dropDuplicates("_1", "_2"),
+ ("a", 1), ("a", 2), ("b", 1))
+ }
}
case class Generic[T](id: T, value: Double)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]