This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d66f26c3e5d [SPARK-50879][ML][PYTHON][CONNECT] Support feature 
scalers on Connect
6d66f26c3e5d is described below

commit 6d66f26c3e5d5513b0ceeafe4ea14686e170e9c4
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 21 16:08:59 2025 +0800

    [SPARK-50879][ML][PYTHON][CONNECT] Support feature scalers on Connect
    
    ### What changes were proposed in this pull request?
    Support feature scalers on Connect:
    
    - org.apache.spark.ml.feature.StandardScaler
    - org.apache.spark.ml.feature.MaxAbsScaler
    - org.apache.spark.ml.feature.MinMaxScaler
    
    ### Why are the changes needed?
    for feature parity
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new algorithms supported on connect
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #49581 from zhengruifeng/ml_connect_scaler.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 dev/sparktestsupport/modules.py                    |   1 +
 .../scala/org/apache/spark/ml/linalg/Vectors.scala |   2 +
 .../services/org.apache.spark.ml.Estimator         |   6 +
 .../services/org.apache.spark.ml.Transformer       |   5 +
 .../org/apache/spark/ml/feature/MaxAbsScaler.scala |   2 +
 .../org/apache/spark/ml/feature/MinMaxScaler.scala |   2 +
 .../apache/spark/ml/feature/StandardScaler.scala   |   2 +
 .../ml/tests/connect/test_parity_feature.py        |  95 +++++++++++++++
 python/pyspark/ml/tests/test_feature.py            | 132 ++++++++++++++++++++-
 .../org/apache/spark/sql/connect/ml/MLUtils.scala  |   5 +
 10 files changed, 249 insertions(+), 3 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 5fd3f7377276..61c065e71115 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1125,6 +1125,7 @@ pyspark_ml_connect = Module(
         "pyspark.ml.tests.connect.test_parity_regression",
         "pyspark.ml.tests.connect.test_parity_clustering",
         "pyspark.ml.tests.connect.test_parity_evaluation",
+        "pyspark.ml.tests.connect.test_parity_feature",
     ],
     excluded_python_implementations=[
         "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 5a0ee9307ab8..94548afb6c29 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -240,6 +240,8 @@ sealed trait Vector extends Serializable {
 @Since("2.0.0")
 object Vectors {
 
+  private[ml] val empty: Vector = zeros(0)
+
   /**
    * Creates a dense vector from its values.
    */
diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index 4046cca07dc0..23f70521214d 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -43,3 +43,9 @@ org.apache.spark.ml.recommendation.ALS
 
 # fpm
 org.apache.spark.ml.fpm.FPGrowth
+
+
+# feature
+org.apache.spark.ml.feature.StandardScaler
+org.apache.spark.ml.feature.MaxAbsScaler
+org.apache.spark.ml.feature.MinMaxScaler
diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
index 7c10796f9a87..4b029ae610d7 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
@@ -41,3 +41,8 @@ org.apache.spark.ml.recommendation.ALSModel
 
 # fpm
 org.apache.spark.ml.fpm.FPGrowthModel
+
+# feature
+org.apache.spark.ml.feature.StandardScalerModel
+org.apache.spark.ml.feature.MaxAbsScalerModel
+org.apache.spark.ml.feature.MinMaxScalerModel
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
index 1a378cd85f3e..66dbabc6187e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
@@ -107,6 +107,8 @@ class MaxAbsScalerModel private[ml] (
 
   import MaxAbsScalerModel._
 
+  private[ml] def this() = this(Identifiable.randomUID("maxAbsScal"), 
Vectors.empty)
+
   /** @group setParam */
   @Since("2.0.0")
   def setInputCol(value: String): this.type = set(inputCol, value)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
index c311f4260424..e3b0590524f3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
@@ -154,6 +154,8 @@ class MinMaxScalerModel private[ml] (
 
   import MinMaxScalerModel._
 
+  private[ml] def this() = this(Identifiable.randomUID("minMaxScal"), 
Vectors.empty, Vectors.empty)
+
   /** @group setParam */
   @Since("1.5.0")
   def setInputCol(value: String): this.type = set(inputCol, value)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index f1e48b053d88..546463c15844 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -147,6 +147,8 @@ class StandardScalerModel private[ml] (
 
   import StandardScalerModel._
 
+  private[ml] def this() = this(Identifiable.randomUID("stdScal"), 
Vectors.empty, Vectors.empty)
+
   /** @group setParam */
   @Since("1.2.0")
   def setInputCol(value: String): this.type = set(inputCol, value)
diff --git a/python/pyspark/ml/tests/connect/test_parity_feature.py 
b/python/pyspark/ml/tests/connect/test_parity_feature.py
new file mode 100644
index 000000000000..105ba07df43b
--- /dev/null
+++ b/python/pyspark/ml/tests/connect/test_parity_feature.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.ml.tests.test_feature import FeatureTestsMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class FeatureParityTests(FeatureTestsMixin, ReusedConnectTestCase):
+    @unittest.skip("Need to support.")
+    def test_binarizer(self):
+        super().test_binarizer()
+
+    @unittest.skip("Need to support.")
+    def test_idf(self):
+        super().test_idf()
+
+    @unittest.skip("Need to support.")
+    def test_ngram(self):
+        super().test_ngram()
+
+    @unittest.skip("Need to support.")
+    def test_stopwordsremover(self):
+        super().test_stopwordsremover()
+
+    @unittest.skip("Need to support.")
+    def test_count_vectorizer_with_binary(self):
+        super().test_count_vectorizer_with_binary()
+
+    @unittest.skip("Need to support.")
+    def test_count_vectorizer_with_maxDF(self):
+        super().test_count_vectorizer_with_maxDF()
+
+    @unittest.skip("Need to support.")
+    def test_count_vectorizer_from_vocab(self):
+        super().test_count_vectorizer_from_vocab()
+
+    @unittest.skip("Need to support.")
+    def test_rformula_force_index_label(self):
+        super().test_rformula_force_index_label()
+
+    @unittest.skip("Need to support.")
+    def test_rformula_string_indexer_order_type(self):
+        super().test_rformula_string_indexer_order_type()
+
+    @unittest.skip("Need to support.")
+    def test_string_indexer_handle_invalid(self):
+        super().test_string_indexer_handle_invalid()
+
+    @unittest.skip("Need to support.")
+    def test_string_indexer_from_labels(self):
+        super().test_string_indexer_from_labels()
+
+    @unittest.skip("Need to support.")
+    def test_target_encoder_binary(self):
+        super().test_target_encoder_binary()
+
+    @unittest.skip("Need to support.")
+    def test_target_encoder_continuous(self):
+        super().test_target_encoder_continuous()
+
+    @unittest.skip("Need to support.")
+    def test_vector_size_hint(self):
+        super().test_vector_size_hint()
+
+    @unittest.skip("Need to support.")
+    def test_apply_binary_term_freqs(self):
+        super().test_apply_binary_term_freqs()
+
+
+if __name__ == "__main__":
+    from pyspark.ml.tests.connect.test_parity_feature import *  # noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/ml/tests/test_feature.py 
b/python/pyspark/ml/tests/test_feature.py
index 92919adecd06..a46fdd22e2bc 100644
--- a/python/pyspark/ml/tests/test_feature.py
+++ b/python/pyspark/ml/tests/test_feature.py
@@ -16,8 +16,11 @@
 # limitations under the License.
 #
 
+import tempfile
 import unittest
 
+import numpy as np
+
 from pyspark.ml.feature import (
     Binarizer,
     CountVectorizer,
@@ -26,6 +29,12 @@ from pyspark.ml.feature import (
     IDF,
     NGram,
     RFormula,
+    StandardScaler,
+    StandardScalerModel,
+    MaxAbsScaler,
+    MaxAbsScalerModel,
+    MinMaxScaler,
+    MinMaxScalerModel,
     StopWordsRemover,
     StringIndexer,
     StringIndexerModel,
@@ -38,7 +47,122 @@ from pyspark.testing.utils import QuietTest
 from pyspark.testing.mlutils import check_params, SparkSessionTestCase
 
 
-class FeatureTests(SparkSessionTestCase):
+class FeatureTestsMixin:
+    def test_standard_scaler(self):
+        df = (
+            self.spark.createDataFrame(
+                [
+                    (1, 1.0, Vectors.dense([0.0])),
+                    (2, 2.0, Vectors.dense([2.0])),
+                    (3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
+                ],
+                ["index", "weight", "features"],
+            )
+            .coalesce(1)
+            .sortWithinPartitions("weight")
+            .select("features")
+        )
+        scaler = StandardScaler(inputCol="features", outputCol="scaled")
+        self.assertEqual(scaler.getInputCol(), "features")
+        self.assertEqual(scaler.getOutputCol(), "scaled")
+
+        # Estimator save & load
+        with tempfile.TemporaryDirectory(prefix="standard_scaler") as d:
+            scaler.write().overwrite().save(d)
+            scaler2 = StandardScaler.load(d)
+            self.assertEqual(str(scaler), str(scaler2))
+
+        model = scaler.fit(df)
+        self.assertTrue(np.allclose(model.mean.toArray(), [1.66666667], 
atol=1e-4))
+        self.assertTrue(np.allclose(model.std.toArray(), [1.52752523], 
atol=1e-4))
+
+        output = model.transform(df)
+        self.assertEqual(output.columns, ["features", "scaled"])
+        self.assertEqual(output.count(), 3)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
+            model.write().overwrite().save(d)
+            model2 = StandardScalerModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
+    def test_maxabs_scaler(self):
+        df = (
+            self.spark.createDataFrame(
+                [
+                    (1, 1.0, Vectors.dense([0.0])),
+                    (2, 2.0, Vectors.dense([2.0])),
+                    (3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
+                ],
+                ["index", "weight", "features"],
+            )
+            .coalesce(1)
+            .sortWithinPartitions("weight")
+            .select("features")
+        )
+
+        scaler = MaxAbsScaler(inputCol="features", outputCol="scaled")
+        self.assertEqual(scaler.getInputCol(), "features")
+        self.assertEqual(scaler.getOutputCol(), "scaled")
+
+        # Estimator save & load
+        with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d:
+            scaler.write().overwrite().save(d)
+            scaler2 = MaxAbsScaler.load(d)
+            self.assertEqual(str(scaler), str(scaler2))
+
+        model = scaler.fit(df)
+        self.assertTrue(np.allclose(model.maxAbs.toArray(), [3.0], atol=1e-4))
+
+        output = model.transform(df)
+        self.assertEqual(output.columns, ["features", "scaled"])
+        self.assertEqual(output.count(), 3)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
+            model.write().overwrite().save(d)
+            model2 = MaxAbsScalerModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
+    def test_minmax_scaler(self):
+        df = (
+            self.spark.createDataFrame(
+                [
+                    (1, 1.0, Vectors.dense([0.0])),
+                    (2, 2.0, Vectors.dense([2.0])),
+                    (3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
+                ],
+                ["index", "weight", "features"],
+            )
+            .coalesce(1)
+            .sortWithinPartitions("weight")
+            .select("features")
+        )
+
+        scaler = MinMaxScaler(inputCol="features", outputCol="scaled")
+        self.assertEqual(scaler.getInputCol(), "features")
+        self.assertEqual(scaler.getOutputCol(), "scaled")
+
+        # Estimator save & load
+        with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d:
+            scaler.write().overwrite().save(d)
+            scaler2 = MinMaxScaler.load(d)
+            self.assertEqual(str(scaler), str(scaler2))
+
+        model = scaler.fit(df)
+        self.assertTrue(np.allclose(model.originalMax.toArray(), [3.0], 
atol=1e-4))
+        self.assertTrue(np.allclose(model.originalMin.toArray(), [0.0], 
atol=1e-4))
+
+        output = model.transform(df)
+        self.assertEqual(output.columns, ["features", "scaled"])
+        self.assertEqual(output.count(), 3)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
+            model.write().overwrite().save(d)
+            model2 = MinMaxScalerModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
     def test_binarizer(self):
         b0 = Binarizer()
         self.assertListEqual(
@@ -530,8 +654,6 @@ class FeatureTests(SparkSessionTestCase):
         expected = DenseVector([0.0, 10.0, 0.5])
         self.assertEqual(output, expected)
 
-
-class HashingTFTest(SparkSessionTestCase):
     def test_apply_binary_term_freqs(self):
         df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], 
["id", "words"])
         n = 10
@@ -554,6 +676,10 @@ class HashingTFTest(SparkSessionTestCase):
             )
 
 
+class FeatureTests(FeatureTestsMixin, SparkSessionTestCase):
+    pass
+
+
 if __name__ == "__main__":
     from pyspark.ml.tests.test_feature import *  # noqa: F401
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index b85bc6771f8e..04dbb60cb1ed 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -425,6 +425,11 @@ private[ml] object MLUtils {
   // leave a security hole, we define an allowed attribute list that can be 
accessed.
   // The attributes could be retrieved from the corresponding python class
   private lazy val ALLOWED_ATTRIBUTES = HashSet(
+    "mean", // StandardScalerModel
+    "std", // StandardScalerModel
+    "maxAbs", // MaxAbsScalerModel
+    "originalMax", // MinMaxScalerModel
+    "originalMin", // MinMaxScalerModel
     "toString",
     "toDebugString",
     "numFeatures",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to