This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b10111625d78 [SPARK-50923][SPARK-50927][ML][PYTHON][CONNECT] Support 
FMClassifier and FMRegressor on Connect
b10111625d78 is described below

commit b10111625d78eab36376e2aaacf0d0fd940ad89b
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jan 27 17:59:22 2025 +0800

    [SPARK-50923][SPARK-50927][ML][PYTHON][CONNECT] Support FMClassifier and 
FMRegressor on Connect
    
    ### What changes were proposed in this pull request?
    Support FMClassifier and FMRegressor on Connect
    
    ### Why are the changes needed?
    for parity
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #49685 from zhengruifeng/ml_connect_fm.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 0e4242d4b2e19e52c5f7bb2b84ef6c55ee6fa9c5)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../services/org.apache.spark.ml.Estimator         |   2 +
 .../services/org.apache.spark.ml.Transformer       |   2 +
 .../spark/ml/classification/FMClassifier.scala     |   3 +
 .../apache/spark/ml/regression/FMRegressor.scala   |   3 +
 python/pyspark/ml/tests/test_classification.py     | 102 +++++++++++++++++++++
 python/pyspark/ml/tests/test_regression.py         |  65 +++++++++++++
 .../org/apache/spark/sql/connect/ml/MLUtils.scala  |   2 +
 7 files changed, 179 insertions(+)

diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index 595355b0c1e4..61338f561868 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -22,6 +22,7 @@
 org.apache.spark.ml.classification.NaiveBayes
 org.apache.spark.ml.classification.LinearSVC
 org.apache.spark.ml.classification.LogisticRegression
+org.apache.spark.ml.classification.FMClassifier
 org.apache.spark.ml.classification.MultilayerPerceptronClassifier
 org.apache.spark.ml.classification.DecisionTreeClassifier
 org.apache.spark.ml.classification.RandomForestClassifier
@@ -32,6 +33,7 @@ org.apache.spark.ml.regression.AFTSurvivalRegression
 org.apache.spark.ml.regression.IsotonicRegression
 org.apache.spark.ml.regression.LinearRegression
 org.apache.spark.ml.regression.GeneralizedLinearRegression
+org.apache.spark.ml.regression.FMRegressor
 org.apache.spark.ml.regression.DecisionTreeRegressor
 org.apache.spark.ml.regression.RandomForestRegressor
 org.apache.spark.ml.regression.GBTRegressor
diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
index 0375bac51d39..04cde68ec806 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
@@ -38,6 +38,7 @@ org.apache.spark.ml.feature.HashingTF
 org.apache.spark.ml.classification.NaiveBayesModel
 org.apache.spark.ml.classification.LinearSVCModel
 org.apache.spark.ml.classification.LogisticRegressionModel
+org.apache.spark.ml.classification.FMClassificationModel
 org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel
 org.apache.spark.ml.classification.DecisionTreeClassificationModel
 org.apache.spark.ml.classification.RandomForestClassificationModel
@@ -48,6 +49,7 @@ org.apache.spark.ml.regression.AFTSurvivalRegressionModel
 org.apache.spark.ml.regression.IsotonicRegressionModel
 org.apache.spark.ml.regression.LinearRegressionModel
 org.apache.spark.ml.regression.GeneralizedLinearRegressionModel
+org.apache.spark.ml.regression.FMRegressionModel
 org.apache.spark.ml.regression.DecisionTreeRegressionModel
 org.apache.spark.ml.regression.RandomForestRegressionModel
 org.apache.spark.ml.regression.GBTRegressionModel
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
index 33e7c1fdd5e0..0ef16cb42776 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
@@ -259,6 +259,9 @@ class FMClassificationModel private[classification] (
     with FMClassifierParams with MLWritable
     with HasTrainingSummary[FMClassificationTrainingSummary]{
 
+  private[ml] def this() = this(Identifiable.randomUID("fmc"),
+    Double.NaN, Vectors.empty, Matrices.empty)
+
   @Since("3.0.0")
   override val numClasses: Int = 2
 
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
index 182107a443c1..02ef1df2c44e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
@@ -461,6 +461,9 @@ class FMRegressionModel private[regression] (
   extends RegressionModel[Vector, FMRegressionModel]
   with FMRegressorParams with MLWritable {
 
+  private[ml] def this() = this(Identifiable.randomUID("fmr"),
+    Double.NaN, Vectors.empty, Matrices.empty)
+
   @Since("3.0.0")
   override val numFeatures: Int = linear.size
 
diff --git a/python/pyspark/ml/tests/test_classification.py 
b/python/pyspark/ml/tests/test_classification.py
index bea622db9079..0bcf680933ef 100644
--- a/python/pyspark/ml/tests/test_classification.py
+++ b/python/pyspark/ml/tests/test_classification.py
@@ -34,6 +34,10 @@ from pyspark.ml.classification import (
     LogisticRegressionModel,
     LogisticRegressionSummary,
     BinaryLogisticRegressionSummary,
+    FMClassifier,
+    FMClassificationModel,
+    FMClassificationSummary,
+    FMClassificationTrainingSummary,
     DecisionTreeClassifier,
     DecisionTreeClassificationModel,
     RandomForestClassifier,
@@ -447,6 +451,104 @@ class ClassificationTestsMixin:
             model2 = LinearSVCModel.load(d)
             self.assertEqual(str(model), str(model2))
 
+    def test_factorization_machine(self):
+        spark = self.spark
+        df = (
+            spark.createDataFrame(
+                [
+                    (1.0, 1.0, Vectors.dense(0.0, 5.0)),
+                    (0.0, 2.0, Vectors.dense(1.0, 2.0)),
+                    (1.0, 3.0, Vectors.dense(2.0, 1.0)),
+                    (0.0, 4.0, Vectors.dense(3.0, 3.0)),
+                ],
+                ["label", "weight", "features"],
+            )
+            .coalesce(1)
+            .sortWithinPartitions("weight")
+        )
+
+        fm = FMClassifier(factorSize=2, maxIter=1, regParam=1.0, seed=1)
+        self.assertEqual(fm.getFactorSize(), 2)
+        self.assertEqual(fm.getMaxIter(), 1)
+        self.assertEqual(fm.getRegParam(), 1.0)
+        self.assertEqual(fm.getSeed(), 1)
+
+        model = fm.fit(df)
+        self.assertEqual(fm.uid, model.uid)
+        self.assertEqual(model.numClasses, 2)
+        self.assertEqual(model.numFeatures, 2)
+        self.assertTrue(
+            np.allclose(model.intercept, 0.9999070647126924, atol=1e-4), 
model.intercept
+        )
+        self.assertTrue(
+            np.allclose(
+                model.linear.toArray(), [-0.999999959956255, 
0.9999999201744205], atol=1e-4
+            ),
+            model.linear,
+        )
+        self.assertTrue(
+            np.allclose(
+                model.factors.toArray(),
+                [[0.99999918, 0.99999858], [-0.99999943, 0.99999854]],
+                atol=1e-4,
+            ),
+            model.factors,
+        )
+
+        vec = Vectors.dense(0.0, 5.0)
+        pred = model.predict(vec)
+        self.assertEqual(pred, 1.0)
+        pred = model.predictRaw(vec)
+        self.assertTrue(
+            np.allclose(pred.toArray(), [-5.9999066655847955, 
5.9999066655847955], atol=1e-4),
+            pred,
+        )
+        pred = model.predictProbability(vec)
+        self.assertTrue(
+            np.allclose(pred.toArray(), [0.002472853377527451, 
0.9975271466224725], atol=1e-4),
+            pred,
+        )
+
+        output = model.transform(df)
+        expected_cols = [
+            "label",
+            "weight",
+            "features",
+            "rawPrediction",
+            "probability",
+            "prediction",
+        ]
+        self.assertEqual(output.columns, expected_cols)
+        self.assertEqual(output.count(), 4)
+
+        # model summary
+        self.assertTrue(model.hasSummary)
+        summary = model.summary()
+        self.assertIsInstance(summary, FMClassificationSummary)
+        self.assertIsInstance(summary, FMClassificationTrainingSummary)
+        self.assertEqual(summary.labels, [0.0, 1.0])
+        self.assertEqual(summary.accuracy, 0.25)
+        self.assertEqual(summary.areaUnderROC, 0.5)
+        self.assertEqual(summary.predictions.columns, expected_cols)
+
+        summary2 = model.evaluate(df)
+        self.assertIsInstance(summary2, FMClassificationSummary)
+        self.assertFalse(isinstance(summary2, FMClassificationTrainingSummary))
+        self.assertEqual(summary2.labels, [0.0, 1.0])
+        self.assertEqual(summary2.accuracy, 0.25)
+        self.assertEqual(summary2.areaUnderROC, 0.5)
+        self.assertEqual(summary2.predictions.columns, expected_cols)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="factorization_machine") as d:
+            fm.write().overwrite().save(d)
+            fm2 = FMClassifier.load(d)
+            self.assertEqual(str(fm), str(fm2))
+
+            model.write().overwrite().save(d)
+            model2 = FMClassificationModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
     def test_decision_tree_classifier(self):
         df = (
             self.spark.createDataFrame(
diff --git a/python/pyspark/ml/tests/test_regression.py 
b/python/pyspark/ml/tests/test_regression.py
index 322a3d70f9e9..f07f75ebeb6c 100644
--- a/python/pyspark/ml/tests/test_regression.py
+++ b/python/pyspark/ml/tests/test_regression.py
@@ -35,6 +35,8 @@ from pyspark.ml.regression import (
     GeneralizedLinearRegressionTrainingSummary,
     LinearRegressionSummary,
     LinearRegressionTrainingSummary,
+    FMRegressor,
+    FMRegressionModel,
     DecisionTreeRegressor,
     DecisionTreeRegressionModel,
     RandomForestRegressor,
@@ -368,6 +370,69 @@ class RegressionTestsMixin:
             model2 = GeneralizedLinearRegressionModel.load(d)
             self.assertEqual(str(model), str(model2))
 
+    def test_factorization_machine(self):
+        spark = self.spark
+        df = (
+            spark.createDataFrame(
+                [
+                    (1, 1.0, Vectors.dense(0.0, 0.0)),
+                    (2, 1.0, Vectors.dense(1.0, 2.0)),
+                    (3, 2.0, Vectors.dense(0.0, 0.0)),
+                    (4, 2.0, Vectors.dense(1.0, 1.0)),
+                ],
+                ["index", "label", "features"],
+            )
+            .coalesce(1)
+            .sortWithinPartitions("index")
+            .select("label", "features")
+        )
+
+        fm = FMRegressor(factorSize=2, maxIter=1, regParam=1.0, seed=1)
+        self.assertEqual(fm.getFactorSize(), 2)
+        self.assertEqual(fm.getMaxIter(), 1)
+        self.assertEqual(fm.getRegParam(), 1.0)
+        self.assertEqual(fm.getSeed(), 1)
+
+        model = fm.fit(df)
+        self.assertEqual(fm.uid, model.uid)
+        self.assertEqual(model.numFeatures, 2)
+        self.assertTrue(
+            np.allclose(model.intercept, 0.9999999966668874, atol=1e-4), 
model.intercept
+        )
+        self.assertTrue(
+            np.allclose(
+                model.linear.toArray(), [0.9999999933342161, 
0.9999999950008276], atol=1e-4
+            ),
+            model.linear,
+        )
+        self.assertTrue(
+            np.allclose(
+                model.factors.toArray(),
+                [[-0.99999954, -0.9999992], [0.99999968, -0.99999918]],
+                atol=1e-4,
+            ),
+            model.factors,
+        )
+
+        vec = Vectors.dense(0.0, 5.0)
+        pred = model.predict(vec)
+        self.assertTrue(np.allclose(pred, 5.999999971671025, atol=1e-4), pred)
+
+        output = model.transform(df)
+        expected_cols = ["label", "features", "prediction"]
+        self.assertEqual(output.columns, expected_cols)
+        self.assertEqual(output.count(), 4)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="factorization_machine") as d:
+            fm.write().overwrite().save(d)
+            fm2 = FMRegressor.load(d)
+            self.assertEqual(str(fm), str(fm2))
+
+            model.write().overwrite().save(d)
+            model2 = FMRegressionModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
     def test_decision_tree_regressor(self):
         df = self.df
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index 38181590484b..56526b7e6737 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -530,6 +530,7 @@ private[ml] object MLUtils {
       Set("intercept", "coefficients", "interceptVector", "coefficientMatrix", 
"evaluate")),
     (classOf[LogisticRegressionSummary], Set("probabilityCol", "featuresCol")),
     (classOf[BinaryLogisticRegressionSummary], Set("scoreCol")),
+    (classOf[FMClassificationModel], Set("intercept", "linear", "factors", 
"evaluate")),
     (classOf[MultilayerPerceptronClassificationModel], Set("weights", 
"evaluate")),
 
     // Regression Models
@@ -589,6 +590,7 @@ private[ml] object MLUtils {
         "tValues",
         "pValues")),
     (classOf[LinearRegressionTrainingSummary], Set("objectiveHistory", 
"totalIterations")),
+    (classOf[FMRegressionModel], Set("intercept", "linear", "factors")),
 
     // Clustering Models
     (classOf[KMeansModel], Set("predict", "numFeatures", 
"clusterCenterMatrix")),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to