This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 83c36c881746 [SPARK-50879][ML][PYTHON][CONNECT] Support feature
scalers on Connect
83c36c881746 is described below
commit 83c36c881746b56b14c0e46d7533fb54202affc7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 21 16:08:59 2025 +0800
[SPARK-50879][ML][PYTHON][CONNECT] Support feature scalers on Connect
### What changes were proposed in this pull request?
Support feature scalers on Connect:
- org.apache.spark.ml.feature.StandardScaler
- org.apache.spark.ml.feature.MaxAbsScaler
- org.apache.spark.ml.feature.MinMaxScaler
### Why are the changes needed?
for feature parity
### Does this PR introduce _any_ user-facing change?
yes, new algorithms supported on connect
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #49581 from zhengruifeng/ml_connect_scaler.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 6d66f26c3e5d5513b0ceeafe4ea14686e170e9c4)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
dev/sparktestsupport/modules.py | 1 +
.../scala/org/apache/spark/ml/linalg/Vectors.scala | 2 +
.../services/org.apache.spark.ml.Estimator | 6 +
.../services/org.apache.spark.ml.Transformer | 5 +
.../org/apache/spark/ml/feature/MaxAbsScaler.scala | 2 +
.../org/apache/spark/ml/feature/MinMaxScaler.scala | 2 +
.../apache/spark/ml/feature/StandardScaler.scala | 2 +
.../ml/tests/connect/test_parity_feature.py | 95 +++++++++++++++
python/pyspark/ml/tests/test_feature.py | 132 ++++++++++++++++++++-
.../org/apache/spark/sql/connect/ml/MLUtils.scala | 5 +
10 files changed, 249 insertions(+), 3 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 5fd3f7377276..61c065e71115 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1125,6 +1125,7 @@ pyspark_ml_connect = Module(
"pyspark.ml.tests.connect.test_parity_regression",
"pyspark.ml.tests.connect.test_parity_clustering",
"pyspark.ml.tests.connect.test_parity_evaluation",
+ "pyspark.ml.tests.connect.test_parity_feature",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy,
pandas, and pyarrow and
diff --git
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 5a0ee9307ab8..94548afb6c29 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -240,6 +240,8 @@ sealed trait Vector extends Serializable {
@Since("2.0.0")
object Vectors {
+ private[ml] val empty: Vector = zeros(0)
+
/**
* Creates a dense vector from its values.
*/
diff --git
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index 4046cca07dc0..23f70521214d 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -43,3 +43,9 @@ org.apache.spark.ml.recommendation.ALS
# fpm
org.apache.spark.ml.fpm.FPGrowth
+
+
+# feature
+org.apache.spark.ml.feature.StandardScaler
+org.apache.spark.ml.feature.MaxAbsScaler
+org.apache.spark.ml.feature.MinMaxScaler
diff --git
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
index 7c10796f9a87..4b029ae610d7 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
@@ -41,3 +41,8 @@ org.apache.spark.ml.recommendation.ALSModel
# fpm
org.apache.spark.ml.fpm.FPGrowthModel
+
+# feature
+org.apache.spark.ml.feature.StandardScalerModel
+org.apache.spark.ml.feature.MaxAbsScalerModel
+org.apache.spark.ml.feature.MinMaxScalerModel
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
index 1a378cd85f3e..66dbabc6187e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
@@ -107,6 +107,8 @@ class MaxAbsScalerModel private[ml] (
import MaxAbsScalerModel._
+ private[ml] def this() = this(Identifiable.randomUID("maxAbsScal"),
Vectors.empty)
+
/** @group setParam */
@Since("2.0.0")
def setInputCol(value: String): this.type = set(inputCol, value)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
index c311f4260424..e3b0590524f3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
@@ -154,6 +154,8 @@ class MinMaxScalerModel private[ml] (
import MinMaxScalerModel._
+ private[ml] def this() = this(Identifiable.randomUID("minMaxScal"),
Vectors.empty, Vectors.empty)
+
/** @group setParam */
@Since("1.5.0")
def setInputCol(value: String): this.type = set(inputCol, value)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index f1e48b053d88..546463c15844 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -147,6 +147,8 @@ class StandardScalerModel private[ml] (
import StandardScalerModel._
+ private[ml] def this() = this(Identifiable.randomUID("stdScal"),
Vectors.empty, Vectors.empty)
+
/** @group setParam */
@Since("1.2.0")
def setInputCol(value: String): this.type = set(inputCol, value)
diff --git a/python/pyspark/ml/tests/connect/test_parity_feature.py
b/python/pyspark/ml/tests/connect/test_parity_feature.py
new file mode 100644
index 000000000000..105ba07df43b
--- /dev/null
+++ b/python/pyspark/ml/tests/connect/test_parity_feature.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.ml.tests.test_feature import FeatureTestsMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class FeatureParityTests(FeatureTestsMixin, ReusedConnectTestCase):
+ @unittest.skip("Need to support.")
+ def test_binarizer(self):
+ super().test_binarizer()
+
+ @unittest.skip("Need to support.")
+ def test_idf(self):
+ super().test_idf()
+
+ @unittest.skip("Need to support.")
+ def test_ngram(self):
+ super().test_ngram()
+
+ @unittest.skip("Need to support.")
+ def test_stopwordsremover(self):
+ super().test_stopwordsremover()
+
+ @unittest.skip("Need to support.")
+ def test_count_vectorizer_with_binary(self):
+ super().test_count_vectorizer_with_binary()
+
+ @unittest.skip("Need to support.")
+ def test_count_vectorizer_with_maxDF(self):
+ super().test_count_vectorizer_with_maxDF()
+
+ @unittest.skip("Need to support.")
+ def test_count_vectorizer_from_vocab(self):
+ super().test_count_vectorizer_from_vocab()
+
+ @unittest.skip("Need to support.")
+ def test_rformula_force_index_label(self):
+ super().test_rformula_force_index_label()
+
+ @unittest.skip("Need to support.")
+ def test_rformula_string_indexer_order_type(self):
+ super().test_rformula_string_indexer_order_type()
+
+ @unittest.skip("Need to support.")
+ def test_string_indexer_handle_invalid(self):
+ super().test_string_indexer_handle_invalid()
+
+ @unittest.skip("Need to support.")
+ def test_string_indexer_from_labels(self):
+ super().test_string_indexer_from_labels()
+
+ @unittest.skip("Need to support.")
+ def test_target_encoder_binary(self):
+ super().test_target_encoder_binary()
+
+ @unittest.skip("Need to support.")
+ def test_target_encoder_continuous(self):
+ super().test_target_encoder_continuous()
+
+ @unittest.skip("Need to support.")
+ def test_vector_size_hint(self):
+ super().test_vector_size_hint()
+
+ @unittest.skip("Need to support.")
+ def test_apply_binary_term_freqs(self):
+ super().test_apply_binary_term_freqs()
+
+
+if __name__ == "__main__":
+ from pyspark.ml.tests.connect.test_parity_feature import * # noqa: F401
+
+ try:
+ import xmlrunner # type: ignore[import]
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports",
verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/ml/tests/test_feature.py
b/python/pyspark/ml/tests/test_feature.py
index 92919adecd06..a46fdd22e2bc 100644
--- a/python/pyspark/ml/tests/test_feature.py
+++ b/python/pyspark/ml/tests/test_feature.py
@@ -16,8 +16,11 @@
# limitations under the License.
#
+import tempfile
import unittest
+import numpy as np
+
from pyspark.ml.feature import (
Binarizer,
CountVectorizer,
@@ -26,6 +29,12 @@ from pyspark.ml.feature import (
IDF,
NGram,
RFormula,
+ StandardScaler,
+ StandardScalerModel,
+ MaxAbsScaler,
+ MaxAbsScalerModel,
+ MinMaxScaler,
+ MinMaxScalerModel,
StopWordsRemover,
StringIndexer,
StringIndexerModel,
@@ -38,7 +47,122 @@ from pyspark.testing.utils import QuietTest
from pyspark.testing.mlutils import check_params, SparkSessionTestCase
-class FeatureTests(SparkSessionTestCase):
+class FeatureTestsMixin:
+ def test_standard_scaler(self):
+ df = (
+ self.spark.createDataFrame(
+ [
+ (1, 1.0, Vectors.dense([0.0])),
+ (2, 2.0, Vectors.dense([2.0])),
+ (3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
+ ],
+ ["index", "weight", "features"],
+ )
+ .coalesce(1)
+ .sortWithinPartitions("weight")
+ .select("features")
+ )
+ scaler = StandardScaler(inputCol="features", outputCol="scaled")
+ self.assertEqual(scaler.getInputCol(), "features")
+ self.assertEqual(scaler.getOutputCol(), "scaled")
+
+ # Estimator save & load
+ with tempfile.TemporaryDirectory(prefix="standard_scaler") as d:
+ scaler.write().overwrite().save(d)
+ scaler2 = StandardScaler.load(d)
+ self.assertEqual(str(scaler), str(scaler2))
+
+ model = scaler.fit(df)
+ self.assertTrue(np.allclose(model.mean.toArray(), [1.66666667],
atol=1e-4))
+ self.assertTrue(np.allclose(model.std.toArray(), [1.52752523],
atol=1e-4))
+
+ output = model.transform(df)
+ self.assertEqual(output.columns, ["features", "scaled"])
+ self.assertEqual(output.count(), 3)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
+ model.write().overwrite().save(d)
+ model2 = StandardScalerModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
+ def test_maxabs_scaler(self):
+ df = (
+ self.spark.createDataFrame(
+ [
+ (1, 1.0, Vectors.dense([0.0])),
+ (2, 2.0, Vectors.dense([2.0])),
+ (3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
+ ],
+ ["index", "weight", "features"],
+ )
+ .coalesce(1)
+ .sortWithinPartitions("weight")
+ .select("features")
+ )
+
+ scaler = MaxAbsScaler(inputCol="features", outputCol="scaled")
+ self.assertEqual(scaler.getInputCol(), "features")
+ self.assertEqual(scaler.getOutputCol(), "scaled")
+
+ # Estimator save & load
+ with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d:
+ scaler.write().overwrite().save(d)
+ scaler2 = MaxAbsScaler.load(d)
+ self.assertEqual(str(scaler), str(scaler2))
+
+ model = scaler.fit(df)
+ self.assertTrue(np.allclose(model.maxAbs.toArray(), [3.0], atol=1e-4))
+
+ output = model.transform(df)
+ self.assertEqual(output.columns, ["features", "scaled"])
+ self.assertEqual(output.count(), 3)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
+ model.write().overwrite().save(d)
+ model2 = MaxAbsScalerModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
+ def test_minmax_scaler(self):
+ df = (
+ self.spark.createDataFrame(
+ [
+ (1, 1.0, Vectors.dense([0.0])),
+ (2, 2.0, Vectors.dense([2.0])),
+ (3, 3.0, Vectors.sparse(1, [(0, 3.0)])),
+ ],
+ ["index", "weight", "features"],
+ )
+ .coalesce(1)
+ .sortWithinPartitions("weight")
+ .select("features")
+ )
+
+ scaler = MinMaxScaler(inputCol="features", outputCol="scaled")
+ self.assertEqual(scaler.getInputCol(), "features")
+ self.assertEqual(scaler.getOutputCol(), "scaled")
+
+ # Estimator save & load
+ with tempfile.TemporaryDirectory(prefix="maxabs_scaler") as d:
+ scaler.write().overwrite().save(d)
+ scaler2 = MinMaxScaler.load(d)
+ self.assertEqual(str(scaler), str(scaler2))
+
+ model = scaler.fit(df)
+ self.assertTrue(np.allclose(model.originalMax.toArray(), [3.0],
atol=1e-4))
+ self.assertTrue(np.allclose(model.originalMin.toArray(), [0.0],
atol=1e-4))
+
+ output = model.transform(df)
+ self.assertEqual(output.columns, ["features", "scaled"])
+ self.assertEqual(output.count(), 3)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="standard_scaler_model") as d:
+ model.write().overwrite().save(d)
+ model2 = MinMaxScalerModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
def test_binarizer(self):
b0 = Binarizer()
self.assertListEqual(
@@ -530,8 +654,6 @@ class FeatureTests(SparkSessionTestCase):
expected = DenseVector([0.0, 10.0, 0.5])
self.assertEqual(output, expected)
-
-class HashingTFTest(SparkSessionTestCase):
def test_apply_binary_term_freqs(self):
df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])],
["id", "words"])
n = 10
@@ -554,6 +676,10 @@ class HashingTFTest(SparkSessionTestCase):
)
+class FeatureTests(FeatureTestsMixin, SparkSessionTestCase):
+ pass
+
+
if __name__ == "__main__":
from pyspark.ml.tests.test_feature import * # noqa: F401
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index b85bc6771f8e..04dbb60cb1ed 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -425,6 +425,11 @@ private[ml] object MLUtils {
// leave a security hole, we define an allowed attribute list that can be
accessed.
// The attributes could be retrieved from the corresponding python class
private lazy val ALLOWED_ATTRIBUTES = HashSet(
+ "mean", // StandardScalerModel
+ "std", // StandardScalerModel
+ "maxAbs", // MaxAbsScalerModel
+ "originalMax", // MinMaxScalerModel
+ "originalMin", // MinMaxScalerModel
"toString",
"toDebugString",
"numFeatures",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]