Repository: spark
Updated Branches:
  refs/heads/master 8a87f7d5c -> 699a4dfd8


[SPARK-14632] randomSplit method fails on dataframes with maps in schema

## What changes were proposed in this pull request?

The patch fixes the issue with the randomSplit method which is not able to 
split dataframes which has maps in schema. The bug was introduced in spark 
1.6.1.

## How was this patch tested?

Tested with unit tests.

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: Subhobrata Dey <[email protected]>

Closes #12438 from sbcd90/randomSplitIssue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/699a4dfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/699a4dfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/699a4dfd

Branch: refs/heads/master
Commit: 699a4dfd89dc598e79955cfd6f66c06b6bf74be6
Parents: 8a87f7d
Author: Subhobrata Dey <[email protected]>
Authored: Sun Apr 17 15:18:32 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sun Apr 17 15:18:32 2016 -0700

----------------------------------------------------------------------
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/699a4dfd/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 4edc90d..fb3e184 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1502,7 +1502,9 @@ class Dataset[T] private[sql](
     // constituent partitions each time a split is materialized which could 
result in
     // overlapping splits. To prevent this, we explicitly sort each input 
partition to make the
     // ordering deterministic.
-    val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global 
= false, logicalPlan)
+    // MapType cannot be sorted.
+    val sorted = 
Sort(logicalPlan.output.filterNot(_.dataType.isInstanceOf[MapType])
+      .map(SortOrder(_, Ascending)), global = false, logicalPlan)
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
     normalizedCumWeights.sliding(2).map { x =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to