This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cdc66a71ea3d [SPARK-45757][ML] Avoid re-computation of NNZ in Binarizer
cdc66a71ea3d is described below

commit cdc66a71ea3d5da0f3f3ef7eabb215336ae472a5
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Nov 2 14:27:14 2023 -0700

    [SPARK-45757][ML] Avoid re-computation of NNZ in Binarizer
    
    ### What changes were proposed in this pull request?
    1, compress vectors with given nnz in Binarizer;
    
    2, rename internal function `def compressed(nnz: Int): Vector` to avoid 
ambiguous reference issue (`vec.compressed.apply(nnz)`) when there is no type 
hint
    ```
    [error] 
/Users/ruifeng.zheng/Dev/spark/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala:132:61:
 ambiguous reference to overloaded definition,
    [error] both method compressed in trait Vector of type (nnz: Int): 
org.apache.spark.ml.linalg.Vector
    [error] and  method compressed in trait Vector of type 
org.apache.spark.ml.linalg.Vector
    ```
    
    ### Why are the changes needed?
    `nnz` is known before compression
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #43619 from zhengruifeng/ml_binarizer_nnz.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/ml/linalg/Vectors.scala |  4 ++--
 .../org/apache/spark/ml/feature/Binarizer.scala    | 26 +++++++++++++---------
 .../apache/spark/ml/feature/VectorAssembler.scala  |  6 +++--
 3 files changed, 22 insertions(+), 14 deletions(-)

diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 016a8366ab86..d8e17ddd24db 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -184,9 +184,9 @@ sealed trait Vector extends Serializable {
    * Returns a vector in either dense or sparse format, whichever uses less 
storage.
    */
   @Since("2.0.0")
-  def compressed: Vector = compressed(numNonzeros)
+  def compressed: Vector = compressedWithNNZ(numNonzeros)
 
-  private[ml] def compressed(nnz: Int): Vector = {
+  private[ml] def compressedWithNNZ(nnz: Int): Vector = {
     // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 
* nnz + 20 bytes.
     if (1.5 * (nnz + 1.0) < size) {
       toSparseWithSize(nnz)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index 2ec7a8632e39..2e09e7444957 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -112,10 +112,11 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
         (Seq($(inputCol)), Seq($(outputCol)), Seq($(threshold)))
       }
 
-    val mappedOutputCols = inputColNames.zip(tds).map { case (inputColName, 
td) =>
-      val binarizerUDF = dataset.schema(inputColName).dataType match {
+    val mappedOutputCols = inputColNames.zip(tds).map { case (colName, td) =>
+      dataset.schema(colName).dataType match {
         case DoubleType =>
-          udf { in: Double => if (in > td) 1.0 else 0.0 }
+          when(!col(colName).isNaN && col(colName) > td, lit(1.0))
+            .otherwise(lit(0.0))
 
         case _: VectorUDT if td >= 0 =>
           udf { vector: Vector =>
@@ -124,27 +125,32 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
             vector.foreachNonZero { (index, value) =>
               if (value > td) {
                 indices += index
-                values +=  1.0
+                values += 1.0
               }
             }
-            Vectors.sparse(vector.size, indices.result(), 
values.result()).compressed
-          }
+
+            val idxArray = indices.result()
+            val valArray = values.result()
+            Vectors.sparse(vector.size, idxArray, valArray)
+              .compressedWithNNZ(idxArray.length)
+          }.apply(col(colName))
 
         case _: VectorUDT if td < 0 =>
           this.logWarning(s"Binarization operations on sparse dataset with 
negative threshold " +
             s"$td will build a dense output, so take care when applying to 
sparse input.")
           udf { vector: Vector =>
             val values = Array.fill(vector.size)(1.0)
+            var nnz = vector.size
             vector.foreachNonZero { (index, value) =>
               if (value <= td) {
                 values(index) = 0.0
+                nnz -= 1
               }
             }
-            Vectors.dense(values).compressed
-          }
-      }
 
-      binarizerUDF(col(inputColName))
+            Vectors.dense(values).compressedWithNNZ(nnz)
+          }.apply(col(colName))
+      }
     }
 
     val outputMetadata = outputColNames.map(outputSchema(_).metadata)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 761352e34a3e..cf5b5ecb2014 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -296,7 +296,9 @@ object VectorAssembler extends 
DefaultParamsReadable[VectorAssembler] {
         throw new SparkException(s"$o of type ${o.getClass.getName} is not 
supported.")
     }
 
-    val (idxArray, valArray) = (indices.result(), values.result())
-    Vectors.sparse(featureIndex, idxArray, 
valArray).compressed(idxArray.length)
+    val idxArray = indices.result()
+    val valArray = values.result()
+    Vectors.sparse(featureIndex, idxArray, valArray)
+      .compressedWithNNZ(idxArray.length)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to