Repository: spark
Updated Branches:
  refs/heads/master d38febee4 -> 910a13b3c


[SPARK-1157][MLlib] Bug fix: lossHistory should exclude rejection steps, and 
remove miniBatch

Getting the lossHistory from Breeze's API which already excludes the rejection 
steps in line search. Also, remove the miniBatch in LBFGS since those 
quasi-Newton methods approximate the inverse of Hessian. It doesn't make sense 
if the gradients are computed from a varying objective.

Author: DB Tsai <[email protected]>

Closes #582 from dbtsai/dbtsai-lbfgs-bug and squashes the following commits:

9cc6cf9 [DB Tsai] Removed the miniBatch in LBFGS.
1ba6a33 [DB Tsai] Formatting the code.
d72c679 [DB Tsai] Using Breeze's states to get the loss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/910a13b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/910a13b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/910a13b3

Branch: refs/heads/master
Commit: 910a13b3c52a6309068b4997da6df6b7d6058a1b
Parents: d38febe
Author: DB Tsai <[email protected]>
Authored: Thu May 8 17:53:22 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Thu May 8 17:53:22 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/mllib/optimization/LBFGS.scala | 63 ++++++++------------
 .../spark/mllib/optimization/LBFGSSuite.scala   | 15 ++---
 2 files changed, 30 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/910a13b3/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index 969a0c5..8f187c9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -42,7 +42,6 @@ class LBFGS(private var gradient: Gradient, private var 
updater: Updater)
   private var convergenceTol = 1E-4
   private var maxNumIterations = 100
   private var regParam = 0.0
-  private var miniBatchFraction = 1.0
 
   /**
    * Set the number of corrections used in the LBFGS update. Default 10.
@@ -58,14 +57,6 @@ class LBFGS(private var gradient: Gradient, private var 
updater: Updater)
   }
 
   /**
-   * Set fraction of data to be used for each L-BFGS iteration. Default 1.0.
-   */
-  def setMiniBatchFraction(fraction: Double): this.type = {
-    this.miniBatchFraction = fraction
-    this
-  }
-
-  /**
    * Set the convergence tolerance of iterations for L-BFGS. Default 1E-4.
    * Smaller value will lead to higher accuracy with the cost of more 
iterations.
    */
@@ -110,7 +101,7 @@ class LBFGS(private var gradient: Gradient, private var 
updater: Updater)
   }
 
   override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): 
Vector = {
-    val (weights, _) = LBFGS.runMiniBatchLBFGS(
+    val (weights, _) = LBFGS.runLBFGS(
       data,
       gradient,
       updater,
@@ -118,7 +109,6 @@ class LBFGS(private var gradient: Gradient, private var 
updater: Updater)
       convergenceTol,
       maxNumIterations,
       regParam,
-      miniBatchFraction,
       initialWeights)
     weights
   }
@@ -132,10 +122,8 @@ class LBFGS(private var gradient: Gradient, private var 
updater: Updater)
 @DeveloperApi
 object LBFGS extends Logging {
   /**
-   * Run Limited-memory BFGS (L-BFGS) in parallel using mini batches.
-   * In each iteration, we sample a subset (fraction miniBatchFraction) of the 
total data
-   * in order to compute a gradient estimate.
-   * Sampling, and averaging the subgradients over this subset is performed 
using one standard
+   * Run Limited-memory BFGS (L-BFGS) in parallel.
+   * Averaging the subgradients over different partitions is performed using 
one standard
    * spark map-reduce in each iteration.
    *
    * @param data - Input data for L-BFGS. RDD of the set of data examples, 
each of
@@ -147,14 +135,12 @@ object LBFGS extends Logging {
    * @param convergenceTol - The convergence tolerance of iterations for L-BFGS
    * @param maxNumIterations - Maximal number of iterations that L-BFGS can be 
run.
    * @param regParam - Regularization parameter
-   * @param miniBatchFraction - Fraction of the input data set that should be 
used for
-   *                          one iteration of L-BFGS. Default value 1.0.
    *
    * @return A tuple containing two elements. The first element is a column 
matrix containing
    *         weights for every feature, and the second element is an array 
containing the loss
    *         computed for every iteration.
    */
-  def runMiniBatchLBFGS(
+  def runLBFGS(
       data: RDD[(Double, Vector)],
       gradient: Gradient,
       updater: Updater,
@@ -162,23 +148,33 @@ object LBFGS extends Logging {
       convergenceTol: Double,
       maxNumIterations: Int,
       regParam: Double,
-      miniBatchFraction: Double,
       initialWeights: Vector): (Vector, Array[Double]) = {
 
     val lossHistory = new ArrayBuffer[Double](maxNumIterations)
 
     val numExamples = data.count()
-    val miniBatchSize = numExamples * miniBatchFraction
 
     val costFun =
-      new CostFun(data, gradient, updater, regParam, miniBatchFraction, 
lossHistory, miniBatchSize)
+      new CostFun(data, gradient, updater, regParam, numExamples)
 
     val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, 
convergenceTol)
 
-    val weights = Vectors.fromBreeze(
-      lbfgs.minimize(new CachedDiffFunction(costFun), 
initialWeights.toBreeze.toDenseVector))
+    val states =
+      lbfgs.iterations(new CachedDiffFunction(costFun), 
initialWeights.toBreeze.toDenseVector)
+
+    /**
+     * NOTE: lossSum and loss is computed using the weights from the previous 
iteration
+     * and regVal is the regularization value computed in the previous 
iteration as well.
+     */
+    var state = states.next()
+    while(states.hasNext) {
+      lossHistory.append(state.value)
+      state = states.next()
+    }
+    lossHistory.append(state.value)
+    val weights = Vectors.fromBreeze(state.x)
 
-    logInfo("LBFGS.runMiniBatchSGD finished. Last 10 losses %s".format(
+    logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
       lossHistory.takeRight(10).mkString(", ")))
 
     (weights, lossHistory.toArray)
@@ -193,9 +189,7 @@ object LBFGS extends Logging {
     gradient: Gradient,
     updater: Updater,
     regParam: Double,
-    miniBatchFraction: Double,
-    lossHistory: ArrayBuffer[Double],
-    miniBatchSize: Double) extends DiffFunction[BDV[Double]] {
+    numExamples: Long) extends DiffFunction[BDV[Double]] {
 
     private var i = 0
 
@@ -204,8 +198,7 @@ object LBFGS extends Logging {
       val localData = data
       val localGradient = gradient
 
-      val (gradientSum, lossSum) = localData.sample(false, miniBatchFraction, 
42 + i)
-        .aggregate((BDV.zeros[Double](weights.size), 0.0))(
+      val (gradientSum, lossSum) = 
localData.aggregate((BDV.zeros[Double](weights.size), 0.0))(
           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, 
features)) =>
             val l = localGradient.compute(
               features, label, Vectors.fromBreeze(weights), 
Vectors.fromBreeze(grad))
@@ -223,7 +216,7 @@ object LBFGS extends Logging {
         Vectors.fromBreeze(weights),
         Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
 
-      val loss = lossSum / miniBatchSize + regVal
+      val loss = lossSum / numExamples + regVal
       /**
        * It will return the gradient part of regularization using updater.
        *
@@ -245,14 +238,8 @@ object LBFGS extends Logging {
         Vectors.fromBreeze(weights),
         Vectors.dense(new Array[Double](weights.size)), 1, 1, 
regParam)._1.toBreeze
 
-      // gradientTotal = gradientSum / miniBatchSize + gradientTotal
-      axpy(1.0 / miniBatchSize, gradientSum, gradientTotal)
-
-      /**
-       * NOTE: lossSum and loss is computed using the weights from the 
previous iteration
-       * and regVal is the regularization value computed in the previous 
iteration as well.
-       */
-      lossHistory.append(loss)
+      // gradientTotal = gradientSum / numExamples + gradientTotal
+      axpy(1.0 / numExamples, gradientSum, gradientTotal)
 
       i += 1
 

http://git-wip-us.apache.org/repos/asf/spark/blob/910a13b3/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
index f33770a..6af1b50 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
@@ -59,7 +59,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
     val convergenceTol = 1e-12
     val maxNumIterations = 10
 
-    val (_, loss) = LBFGS.runMiniBatchLBFGS(
+    val (_, loss) = LBFGS.runLBFGS(
       dataRDD,
       gradient,
       simpleUpdater,
@@ -67,7 +67,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers {
       convergenceTol,
       maxNumIterations,
       regParam,
-      miniBatchFrac,
       initialWeightsWithIntercept)
 
     // Since the cost function is convex, the loss is guaranteed to be 
monotonically decreasing
@@ -104,7 +103,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
     val convergenceTol = 1e-12
     val maxNumIterations = 10
 
-    val (weightLBFGS, lossLBFGS) = LBFGS.runMiniBatchLBFGS(
+    val (weightLBFGS, lossLBFGS) = LBFGS.runLBFGS(
       dataRDD,
       gradient,
       squaredL2Updater,
@@ -112,7 +111,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
       convergenceTol,
       maxNumIterations,
       regParam,
-      miniBatchFrac,
       initialWeightsWithIntercept)
 
     val numGDIterations = 50
@@ -150,7 +148,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
     val maxNumIterations = 8
     var convergenceTol = 0.0
 
-    val (_, lossLBFGS1) = LBFGS.runMiniBatchLBFGS(
+    val (_, lossLBFGS1) = LBFGS.runLBFGS(
       dataRDD,
       gradient,
       squaredL2Updater,
@@ -158,7 +156,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
       convergenceTol,
       maxNumIterations,
       regParam,
-      miniBatchFrac,
       initialWeightsWithIntercept)
 
     // Note that the first loss is computed with initial weights,
@@ -166,7 +163,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
     assert(lossLBFGS1.length == 9)
 
     convergenceTol = 0.1
-    val (_, lossLBFGS2) = LBFGS.runMiniBatchLBFGS(
+    val (_, lossLBFGS2) = LBFGS.runLBFGS(
       dataRDD,
       gradient,
       squaredL2Updater,
@@ -174,7 +171,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
       convergenceTol,
       maxNumIterations,
       regParam,
-      miniBatchFrac,
       initialWeightsWithIntercept)
 
     // Based on observation, lossLBFGS2 runs 3 iterations, no theoretically 
guaranteed.
@@ -182,7 +178,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
     assert((lossLBFGS2(2) - lossLBFGS2(3)) / lossLBFGS2(2) < convergenceTol)
 
     convergenceTol = 0.01
-    val (_, lossLBFGS3) = LBFGS.runMiniBatchLBFGS(
+    val (_, lossLBFGS3) = LBFGS.runLBFGS(
       dataRDD,
       gradient,
       squaredL2Updater,
@@ -190,7 +186,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext 
with ShouldMatchers {
       convergenceTol,
       maxNumIterations,
       regParam,
-      miniBatchFrac,
       initialWeightsWithIntercept)
 
     // With smaller convergenceTol, it takes more steps.

Reply via email to