Repository: spark Updated Branches: refs/heads/branch-1.1 bd3ce2ffb -> 0b354be2f
[SPARK-3048][MLLIB] add LabeledPoint.parse and remove loadStreamingLabeledPoints Move `parse()` from `LabeledPointParser` to `LabeledPoint` and make it public. This breaks binary compatibility only when a user uses synthesized methods like `tupled` and `curried`, which is rare. `LabeledPoint.parse` is more consistent with `Vectors.parse`, which is why `LabeledPointParser` is not preferred. freeman-lab tdas Author: Xiangrui Meng <[email protected]> Closes #1952 from mengxr/labelparser and squashes the following commits: c818fb2 [Xiangrui Meng] merge master ce20e6f [Xiangrui Meng] update mima excludes b386b8d [Xiangrui Meng] fix tests 2436b3d [Xiangrui Meng] add parse() to LabeledPoint (cherry picked from commit 7e70708a99949549adde00cb6246a9582bbc4929) Signed-off-by: Xiangrui Meng <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b354be2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b354be2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b354be2 Branch: refs/heads/branch-1.1 Commit: 0b354be2f9ec35547a60591acf4f4773a4869690 Parents: bd3ce2f Author: Xiangrui Meng <[email protected]> Authored: Sat Aug 16 15:13:34 2014 -0700 Committer: Xiangrui Meng <[email protected]> Committed: Sat Aug 16 15:13:43 2014 -0700 ---------------------------------------------------------------------- .../examples/mllib/StreamingLinearRegression.scala | 7 +++---- .../spark/mllib/regression/LabeledPoint.scala | 2 +- .../StreamingLinearRegressionWithSGD.scala | 2 +- .../org/apache/spark/mllib/util/MLUtils.scala | 17 ++--------------- .../spark/mllib/regression/LabeledPointSuite.scala | 4 ++-- .../StreamingLinearRegressionSuite.scala | 6 +++--- project/MimaExcludes.scala | 5 +++++ 7 files changed, 17 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala index 1fd37ed..0e992fa 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -18,8 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD +import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -56,8 +55,8 @@ object StreamingLinearRegression { val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression") val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) - val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0)) - val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1)) + val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse) + val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0))) http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 62a03af..17c753c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -36,7 +36,7 @@ case class LabeledPoint(label: Double, features: Vector) { /** * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]]. */ -private[mllib] object LabeledPointParser { +object LabeledPoint { /** * Parses a string resulted from `LabeledPoint#toString` into * an [[org.apache.spark.mllib.regression.LabeledPoint]]. http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index 8851097..1d11fde 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.Vector /** * Train or predict a linear regression model on streaming data. Training uses http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index f4cce86..ca35100 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.util.random.BernoulliSampler -import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext @@ -185,7 +185,7 @@ object MLUtils { * @return labeled points stored as an RDD[LabeledPoint] */ def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] = - sc.textFile(path, minPartitions).map(LabeledPointParser.parse) + sc.textFile(path, minPartitions).map(LabeledPoint.parse) /** * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of @@ -195,19 +195,6 @@ object MLUtils { loadLabeledPoints(sc, dir, sc.defaultMinPartitions) /** - * Loads streaming labeled points from a stream of text files - * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. - * See `StreamingContext.textFileStream` for more details on how to - * generate a stream from files - * - * @param ssc Streaming context - * @param dir Directory path in any Hadoop-supported file system URI - * @return Labeled points stored as a DStream[LabeledPoint] - */ - def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] = - ssc.textFileStream(dir).map(LabeledPointParser.parse) - - /** * Load labeled data from a file. The data format used here is * <L>, <f1> <f2> ... * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala index d9308aa..110c44a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -28,12 +28,12 @@ class LabeledPointSuite extends FunSuite { LabeledPoint(1.0, Vectors.dense(1.0, 0.0)), LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0)))) points.foreach { p => - assert(p === LabeledPointParser.parse(p.toString)) + assert(p === LabeledPoint.parse(p.toString)) } } test("parse labeled points with v0.9 format") { - val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0") + val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0") assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index ed21f84..45e25ee 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -26,7 +26,7 @@ import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext, MLUtils} +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils @@ -55,7 +55,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { val numBatches = 10 val batchDuration = Milliseconds(1000) val ssc = new StreamingContext(sc, batchDuration) - val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString) + val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.1) @@ -97,7 +97,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { val batchDuration = Milliseconds(2000) val ssc = new StreamingContext(sc, batchDuration) val numBatches = 5 - val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString) + val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0)) .setStepSize(0.1) http://git-wip-us.apache.org/repos/asf/spark/blob/0b354be2/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index bbe68b2..3005893 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -129,6 +129,11 @@ object MimaExcludes { Seq( // new Vector methods in MLlib (binary compatible assuming users do not implement Vector) ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.copy") ) ++ + Seq( // synthetic methods generated in LabeledPoint + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.regression.LabeledPoint$"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.regression.LabeledPoint.apply"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.regression.LabeledPoint.toString") + ) ++ Seq ( // Scala 2.11 compatibility fix ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.<init>$default$2") ) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
