[SPARK-19533][EXAMPLES] Convert Java tests to use lambdas, Java 8 features ## What changes were proposed in this pull request?
Convert Java tests to use lambdas, Java 8 features. ## How was this patch tested? Jenkins tests. Author: Sean Owen <[email protected]> Closes #16961 from srowen/SPARK-19533. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de14d35f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de14d35f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de14d35f Branch: refs/heads/master Commit: de14d35f77071932963a994fac5aec0e5df838a1 Parents: ba8912e Author: Sean Owen <[email protected]> Authored: Sun Feb 19 09:37:56 2017 -0800 Committer: Sean Owen <[email protected]> Committed: Sun Feb 19 09:37:56 2017 -0800 ---------------------------------------------------------------------- .../org/apache/spark/examples/JavaLogQuery.java | 21 +-- .../org/apache/spark/examples/JavaPageRank.java | 49 ++----- .../org/apache/spark/examples/JavaSparkPi.java | 20 +-- .../spark/examples/JavaStatusTrackerDemo.java | 5 +- .../java/org/apache/spark/examples/JavaTC.java | 8 +- .../apache/spark/examples/JavaWordCount.java | 27 +--- .../spark/examples/ml/JavaALSExample.java | 7 +- ...SelectionViaTrainValidationSplitExample.java | 3 - .../spark/examples/ml/JavaTokenizerExample.java | 13 +- .../examples/ml/JavaVectorSlicerExample.java | 7 +- .../mllib/JavaAssociationRulesExample.java | 6 +- .../JavaBinaryClassificationMetricsExample.java | 33 ++--- .../mllib/JavaBisectingKMeansExample.java | 7 +- .../mllib/JavaChiSqSelectorExample.java | 38 ++---- .../JavaDecisionTreeClassificationExample.java | 26 +--- .../JavaDecisionTreeRegressionExample.java | 33 ++--- .../mllib/JavaElementwiseProductExample.java | 27 +--- .../mllib/JavaGaussianMixtureExample.java | 19 +-- ...vaGradientBoostingClassificationExample.java | 21 +-- .../JavaGradientBoostingRegressionExample.java | 30 +---- .../mllib/JavaIsotonicRegressionExample.java | 39 ++---- .../spark/examples/mllib/JavaKMeansExample.java | 19 +-- .../spark/examples/mllib/JavaLBFGSExample.java | 23 +--- .../JavaLatentDirichletAllocationExample.java | 28 ++-- .../JavaLinearRegressionWithSGDExample.java | 47 +++---- .../JavaLogisticRegressionWithLBFGSExample.java | 14 +- ...aMulticlassClassificationMetricsExample.java | 13 +- .../examples/mllib/JavaNaiveBayesExample.java | 19 +-- .../JavaPowerIterationClusteringExample.java | 6 +- .../JavaRandomForestClassificationExample.java | 23 +--- .../JavaRandomForestRegressionExample.java | 37 ++--- .../mllib/JavaRankingMetricsExample.java | 135 ++++++------------- .../mllib/JavaRecommendationExample.java | 58 +++----- .../mllib/JavaRegressionMetricsExample.java | 31 ++--- .../examples/mllib/JavaSVMWithSGDExample.java | 13 +- .../examples/mllib/JavaSimpleFPGrowth.java | 12 +- .../mllib/JavaStreamingTestExample.java | 40 ++---- .../examples/sql/JavaSQLDataSourceExample.java | 8 +- .../spark/examples/sql/JavaSparkSQLExample.java | 60 +++------ .../examples/sql/hive/JavaSparkHiveExample.java | 9 +- .../streaming/JavaStructuredKafkaWordCount.java | 10 +- .../JavaStructuredNetworkWordCount.java | 11 +- .../JavaStructuredNetworkWordCountWindowed.java | 16 +-- .../examples/streaming/JavaCustomReceiver.java | 34 +---- .../streaming/JavaDirectKafkaWordCount.java | 31 +---- .../examples/streaming/JavaFlumeEventCount.java | 8 +- .../examples/streaming/JavaKafkaWordCount.java | 33 +---- .../streaming/JavaNetworkWordCount.java | 25 +--- .../examples/streaming/JavaQueueStream.java | 24 +--- .../JavaRecoverableNetworkWordCount.java | 91 +++++-------- .../streaming/JavaSqlNetworkWordCount.java | 51 +++---- .../streaming/JavaStatefulNetworkWordCount.java | 30 +---- 52 files changed, 380 insertions(+), 1018 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 7775443..cf12de3 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -17,18 +17,16 @@ package org.apache.spark.examples; -import com.google.common.collect.Lists; import scala.Tuple2; import scala.Tuple3; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -40,7 +38,7 @@ import java.util.regex.Pattern; */ public final class JavaLogQuery { - public static final List<String> exampleApacheLogs = Lists.newArrayList( + public static final List<String> exampleApacheLogs = Arrays.asList( "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " + "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " + @@ -109,19 +107,10 @@ public final class JavaLogQuery { JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); - JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() { - @Override - public Tuple2<Tuple3<String, String, String>, Stats> call(String s) { - return new Tuple2<>(extractKey(s), extractStats(s)); - } - }); + JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = + dataSet.mapToPair(s -> new Tuple2<>(extractKey(s), extractStats(s))); - JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { - @Override - public Stats call(Stats stats, Stats stats2) { - return stats.merge(stats2); - } - }); + JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(Stats::merge); List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect(); for (Tuple2<?,?> t : output) { http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index bcc493b..b5b4703 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -19,7 +19,6 @@ package org.apache.spark.examples; import java.util.ArrayList; import java.util.List; -import java.util.Iterator; import java.util.regex.Pattern; import scala.Tuple2; @@ -28,10 +27,7 @@ import com.google.common.collect.Iterables; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; /** @@ -90,52 +86,35 @@ public final class JavaPageRank { JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); // Loads all URLs from input file and initialize their neighbors. - JavaPairRDD<String, Iterable<String>> links = lines.mapToPair( - new PairFunction<String, String, String>() { - @Override - public Tuple2<String, String> call(String s) { - String[] parts = SPACES.split(s); - return new Tuple2<>(parts[0], parts[1]); - } - }).distinct().groupByKey().cache(); + JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> { + String[] parts = SPACES.split(s); + return new Tuple2<>(parts[0], parts[1]); + }).distinct().groupByKey().cache(); // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. - JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() { - @Override - public Double call(Iterable<String> rs) { - return 1.0; - } - }); + JavaPairRDD<String, Double> ranks = links.mapValues(rs -> 1.0); // Calculates and updates URL ranks continuously using PageRank algorithm. for (int current = 0; current < Integer.parseInt(args[1]); current++) { // Calculates URL contributions to the rank of other URLs. JavaPairRDD<String, Double> contribs = links.join(ranks).values() - .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { - @Override - public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) { - int urlCount = Iterables.size(s._1); - List<Tuple2<String, Double>> results = new ArrayList<>(); - for (String n : s._1) { - results.add(new Tuple2<>(n, s._2() / urlCount)); - } - return results.iterator(); + .flatMapToPair(s -> { + int urlCount = Iterables.size(s._1()); + List<Tuple2<String, Double>> results = new ArrayList<>(); + for (String n : s._1) { + results.add(new Tuple2<>(n, s._2() / urlCount)); } - }); + return results.iterator(); + }); // Re-calculates URL ranks based on neighbor contributions. - ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() { - @Override - public Double call(Double sum) { - return 0.15 + sum * 0.85; - } - }); + ranks = contribs.reduceByKey(new Sum()).mapValues(sum -> 0.15 + sum * 0.85); } // Collects all URL ranks and dump them to console. List<Tuple2<String, Double>> output = ranks.collect(); for (Tuple2<?,?> tuple : output) { - System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); + System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); } spark.stop(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 89855e8..cb4b265 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -19,8 +19,6 @@ package org.apache.spark.examples; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.sql.SparkSession; import java.util.ArrayList; @@ -49,19 +47,11 @@ public final class JavaSparkPi { JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); - int count = dataSet.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer integer) { - double x = Math.random() * 2 - 1; - double y = Math.random() * 2 - 1; - return (x * x + y * y <= 1) ? 1 : 0; - } - }).reduce(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer integer, Integer integer2) { - return integer + integer2; - } - }); + int count = dataSet.map(integer -> { + double x = Math.random() * 2 - 1; + double y = Math.random() * 2 - 1; + return (x * x + y * y <= 1) ? 1 : 0; + }).reduce((integer, integer2) -> integer + integer2); System.out.println("Pi is roughly " + 4.0 * count / n); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java index 6f899c7..b0ebedf 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusTrackerDemo.java @@ -25,7 +25,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; - import java.util.Arrays; import java.util.List; @@ -50,11 +49,11 @@ public final class JavaStatusTrackerDemo { .appName(APP_NAME) .getOrCreate(); - final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Example of implementing a progress reporter for a simple job. JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( - new IdentityWithDelay<Integer>()); + new IdentityWithDelay<>()); JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync(); while (!jobFuture.isDone()) { Thread.sleep(1000); // 1 second http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaTC.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index f12ca77..bde30b8 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -80,13 +80,7 @@ public final class JavaTC { // the graph to obtain the path (x, z). // Because join() joins on keys, the edges are stored in reversed order. - JavaPairRDD<Integer, Integer> edges = tc.mapToPair( - new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) { - return new Tuple2<>(e._2(), e._1()); - } - }); + JavaPairRDD<Integer, Integer> edges = tc.mapToPair(e -> new Tuple2<>(e._2(), e._1())); long oldCount; long nextCount = tc.count(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 8f18604..f1ce1e9 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -21,13 +21,9 @@ import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -48,28 +44,11 @@ public final class JavaWordCount { JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); - JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String s) { - return Arrays.asList(SPACE.split(s)).iterator(); - } - }); + JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); - JavaPairRDD<String, Integer> ones = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }); + JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1)); - JavaPairRDD<String, Integer> counts = ones.reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?,?> tuple : output) { http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java index 739558e..33ba668 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java @@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession; import java.io.Serializable; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.ml.evaluation.RegressionEvaluator; import org.apache.spark.ml.recommendation.ALS; import org.apache.spark.ml.recommendation.ALSModel; @@ -88,11 +87,7 @@ public class JavaALSExample { // $example on$ JavaRDD<Rating> ratingsRDD = spark .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD() - .map(new Function<String, Rating>() { - public Rating call(String str) { - return Rating.parseRating(str); - } - }); + .map(Rating::parseRating); Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class); Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2}); Dataset<Row> training = splits[0]; http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java index 0f96293..9a4722b 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java @@ -32,9 +32,6 @@ import org.apache.spark.sql.SparkSession; /** * Java example demonstrating model selection using TrainValidationSplit. * - * The example is based on {@link org.apache.spark.examples.ml.JavaSimpleParamsExample} - * using linear regression. - * * Run with * {{{ * bin/run-example ml.JavaModelSelectionViaTrainValidationSplitExample http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java index 004e9b1..3f809eb 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java @@ -69,20 +69,17 @@ public class JavaTokenizerExample { .setOutputCol("words") .setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false); - spark.udf().register("countTokens", new UDF1<WrappedArray<String>, Integer>() { - @Override - public Integer call(WrappedArray<String> words) { - return words.size(); - } - }, DataTypes.IntegerType); + spark.udf().register("countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType); Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame); tokenized.select("sentence", "words") - .withColumn("tokens", callUDF("countTokens", col("words"))).show(false); + .withColumn("tokens", callUDF("countTokens", col("words"))) + .show(false); Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame); regexTokenized.select("sentence", "words") - .withColumn("tokens", callUDF("countTokens", col("words"))).show(false); + .withColumn("tokens", callUDF("countTokens", col("words"))) + .show(false); // $example off$ spark.stop(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java index 1922514..1ae48be 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java @@ -20,10 +20,9 @@ package org.apache.spark.examples.ml; import org.apache.spark.sql.SparkSession; // $example on$ +import java.util.Arrays; import java.util.List; -import com.google.common.collect.Lists; - import org.apache.spark.ml.attribute.Attribute; import org.apache.spark.ml.attribute.AttributeGroup; import org.apache.spark.ml.attribute.NumericAttribute; @@ -43,14 +42,14 @@ public class JavaVectorSlicerExample { .getOrCreate(); // $example on$ - Attribute[] attrs = new Attribute[]{ + Attribute[] attrs = { NumericAttribute.defaultAttr().withName("f1"), NumericAttribute.defaultAttr().withName("f2"), NumericAttribute.defaultAttr().withName("f3") }; AttributeGroup group = new AttributeGroup("userFeatures", attrs); - List<Row> data = Lists.newArrayList( + List<Row> data = Arrays.asList( RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})), RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0)) ); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java index 189560e..5f43603 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java @@ -38,9 +38,9 @@ public class JavaAssociationRulesExample { // $example on$ JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList( - new FreqItemset<String>(new String[] {"a"}, 15L), - new FreqItemset<String>(new String[] {"b"}, 35L), - new FreqItemset<String>(new String[] {"a", "b"}, 12L) + new FreqItemset<>(new String[] {"a"}, 15L), + new FreqItemset<>(new String[] {"b"}, 35L), + new FreqItemset<>(new String[] {"a", "b"}, 12L) )); AssociationRules arules = new AssociationRules() http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java index 12aa14f..b9d0313 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; @@ -46,7 +45,7 @@ public class JavaBinaryClassificationMetricsExample { JavaRDD<LabeledPoint> test = splits[1]; // Run training algorithm to build the model. - final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() + LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(2) .run(training.rdd()); @@ -54,15 +53,8 @@ public class JavaBinaryClassificationMetricsExample { model.clearThreshold(); // Compute raw scores on the test set. - JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map( - new Function<LabeledPoint, Tuple2<Object, Object>>() { - @Override - public Tuple2<Object, Object> call(LabeledPoint p) { - Double prediction = model.predict(p.features()); - return new Tuple2<Object, Object>(prediction, p.label()); - } - } - ); + JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. BinaryClassificationMetrics metrics = @@ -73,32 +65,25 @@ public class JavaBinaryClassificationMetricsExample { System.out.println("Precision by threshold: " + precision.collect()); // Recall by threshold - JavaRDD<Tuple2<Object, Object>> recall = metrics.recallByThreshold().toJavaRDD(); + JavaRDD<?> recall = metrics.recallByThreshold().toJavaRDD(); System.out.println("Recall by threshold: " + recall.collect()); // F Score by threshold - JavaRDD<Tuple2<Object, Object>> f1Score = metrics.fMeasureByThreshold().toJavaRDD(); + JavaRDD<?> f1Score = metrics.fMeasureByThreshold().toJavaRDD(); System.out.println("F1 Score by threshold: " + f1Score.collect()); - JavaRDD<Tuple2<Object, Object>> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD(); + JavaRDD<?> f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD(); System.out.println("F2 Score by threshold: " + f2Score.collect()); // Precision-recall curve - JavaRDD<Tuple2<Object, Object>> prc = metrics.pr().toJavaRDD(); + JavaRDD<?> prc = metrics.pr().toJavaRDD(); System.out.println("Precision-recall curve: " + prc.collect()); // Thresholds - JavaRDD<Double> thresholds = precision.map( - new Function<Tuple2<Object, Object>, Double>() { - @Override - public Double call(Tuple2<Object, Object> t) { - return new Double(t._1().toString()); - } - } - ); + JavaRDD<Double> thresholds = precision.map(t -> Double.parseDouble(t._1().toString())); // ROC Curve - JavaRDD<Tuple2<Object, Object>> roc = metrics.roc().toJavaRDD(); + JavaRDD<?> roc = metrics.roc().toJavaRDD(); System.out.println("ROC curve: " + roc.collect()); // AUPRC http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java index c600094..f878b55 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java @@ -17,10 +17,9 @@ package org.apache.spark.examples.mllib; -import java.util.ArrayList; - // $example on$ -import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.List; // $example off$ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -41,7 +40,7 @@ public class JavaBisectingKMeansExample { JavaSparkContext sc = new JavaSparkContext(sparkConf); // $example on$ - ArrayList<Vector> localData = Lists.newArrayList( + List<Vector> localData = Arrays.asList( Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3), Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3), Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3), http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java index ad44acb..ce354af 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java @@ -19,10 +19,8 @@ package org.apache.spark.examples.mllib; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.VoidFunction; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.feature.ChiSqSelector; import org.apache.spark.mllib.feature.ChiSqSelectorModel; import org.apache.spark.mllib.linalg.Vectors; @@ -42,41 +40,25 @@ public class JavaChiSqSelectorExample { // Discretize data in 16 equal bins since ChiSqSelector requires categorical features // Although features are doubles, the ChiSqSelector treats each unique value as a category - JavaRDD<LabeledPoint> discretizedData = points.map( - new Function<LabeledPoint, LabeledPoint>() { - @Override - public LabeledPoint call(LabeledPoint lp) { - final double[] discretizedFeatures = new double[lp.features().size()]; - for (int i = 0; i < lp.features().size(); ++i) { - discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16); - } - return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures)); - } + JavaRDD<LabeledPoint> discretizedData = points.map(lp -> { + double[] discretizedFeatures = new double[lp.features().size()]; + for (int i = 0; i < lp.features().size(); ++i) { + discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16); } - ); + return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures)); + }); // Create ChiSqSelector that will select top 50 of 692 features ChiSqSelector selector = new ChiSqSelector(50); // Create ChiSqSelector model (selecting features) - final ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd()); + ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd()); // Filter the top 50 features from each feature vector - JavaRDD<LabeledPoint> filteredData = discretizedData.map( - new Function<LabeledPoint, LabeledPoint>() { - @Override - public LabeledPoint call(LabeledPoint lp) { - return new LabeledPoint(lp.label(), transformer.transform(lp.features())); - } - } - ); + JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp -> + new LabeledPoint(lp.label(), transformer.transform(lp.features()))); // $example off$ System.out.println("filtered data: "); - filteredData.foreach(new VoidFunction<LabeledPoint>() { - @Override - public void call(LabeledPoint labeledPoint) throws Exception { - System.out.println(labeledPoint.toString()); - } - }); + filteredData.foreach(System.out::println); jsc.stop(); } http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java index 66387b9..032c168 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java @@ -27,8 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; @@ -53,31 +51,21 @@ class JavaDecisionTreeClassificationExample { // Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. - Integer numClasses = 2; + int numClasses = 2; Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); String impurity = "gini"; - Integer maxDepth = 5; - Integer maxBins = 32; + int maxDepth = 5; + int maxBins = 32; // Train a DecisionTree model for classification. - final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, + DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on test instances and compute test error JavaPairRDD<Double, Double> predictionAndLabel = - testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testErr = - 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() { - @Override - public Boolean call(Tuple2<Double, Double> pl) { - return !pl._1().equals(pl._2()); - } - }).count() / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testErr = + predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count(); System.out.println("Test Error: " + testErr); System.out.println("Learned classification tree model:\n" + model.toDebugString()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java index 904e7f7..f222c38 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java @@ -27,9 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; @@ -56,34 +53,20 @@ class JavaDecisionTreeRegressionExample { // Empty categoricalFeaturesInfo indicates all features are continuous. Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); String impurity = "variance"; - Integer maxDepth = 5; - Integer maxBins = 32; + int maxDepth = 5; + int maxBins = 32; // Train a DecisionTree model. - final DecisionTreeModel model = DecisionTree.trainRegressor(trainingData, + DecisionTreeModel model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on test instances and compute test error JavaPairRDD<Double, Double> predictionAndLabel = - testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testMSE = - predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() { - @Override - public Double call(Tuple2<Double, Double> pl) { - Double diff = pl._1() - pl._2(); - return diff * diff; - } - }).reduce(new Function2<Double, Double, Double>() { - @Override - public Double call(Double a, Double b) { - return a + b; - } - }) / data.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testMSE = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java index c8ce6ab..2d45c61 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java @@ -25,12 +25,10 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.feature.ElementwiseProduct; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; // $example off$ -import org.apache.spark.api.java.function.VoidFunction; public class JavaElementwiseProductExample { public static void main(String[] args) { @@ -43,35 +41,18 @@ public class JavaElementwiseProductExample { JavaRDD<Vector> data = jsc.parallelize(Arrays.asList( Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0))); Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0); - final ElementwiseProduct transformer = new ElementwiseProduct(transformingVector); + ElementwiseProduct transformer = new ElementwiseProduct(transformingVector); // Batch transform and per-row transform give the same results: JavaRDD<Vector> transformedData = transformer.transform(data); - JavaRDD<Vector> transformedData2 = data.map( - new Function<Vector, Vector>() { - @Override - public Vector call(Vector v) { - return transformer.transform(v); - } - } - ); + JavaRDD<Vector> transformedData2 = data.map(transformer::transform); // $example off$ System.out.println("transformedData: "); - transformedData.foreach(new VoidFunction<Vector>() { - @Override - public void call(Vector vector) throws Exception { - System.out.println(vector.toString()); - } - }); + transformedData.foreach(System.out::println); System.out.println("transformedData2: "); - transformedData2.foreach(new VoidFunction<Vector>() { - @Override - public void call(Vector vector) throws Exception { - System.out.println(vector.toString()); - } - }); + transformedData2.foreach(System.out::println); jsc.stop(); } http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java index 3124411..5792e5a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java @@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.GaussianMixture; import org.apache.spark.mllib.clustering.GaussianMixtureModel; import org.apache.spark.mllib.linalg.Vector; @@ -39,18 +38,14 @@ public class JavaGaussianMixtureExample { // Load and parse data String path = "data/mllib/gmm_data.txt"; JavaRDD<String> data = jsc.textFile(path); - JavaRDD<Vector> parsedData = data.map( - new Function<String, Vector>() { - public Vector call(String s) { - String[] sarray = s.trim().split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) { - values[i] = Double.parseDouble(sarray[i]); - } - return Vectors.dense(values); - } + JavaRDD<Vector> parsedData = data.map(s -> { + String[] sarray = s.trim().split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) { + values[i] = Double.parseDouble(sarray[i]); } - ); + return Vectors.dense(values); + }); parsedData.cache(); // Cluster the data into two classes using GaussianMixture http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java index 213949e..521ee96 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingClassificationExample.java @@ -27,8 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.GradientBoostedTrees; import org.apache.spark.mllib.tree.configuration.BoostingStrategy; @@ -61,24 +59,13 @@ public class JavaGradientBoostingClassificationExample { Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo); - final GradientBoostedTreesModel model = - GradientBoostedTrees.train(trainingData, boostingStrategy); + GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy); // Evaluate model on test instances and compute test error JavaPairRDD<Double, Double> predictionAndLabel = - testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testErr = - 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() { - @Override - public Boolean call(Tuple2<Double, Double> pl) { - return !pl._1().equals(pl._2()); - } - }).count() / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testErr = + predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count(); System.out.println("Test Error: " + testErr); System.out.println("Learned classification GBT model:\n" + model.toDebugString()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java index 78db442..b345d19 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGradientBoostingRegressionExample.java @@ -24,12 +24,9 @@ import java.util.Map; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.GradientBoostedTrees; import org.apache.spark.mllib.tree.configuration.BoostingStrategy; @@ -60,30 +57,15 @@ public class JavaGradientBoostingRegressionExample { Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo); - final GradientBoostedTreesModel model = - GradientBoostedTrees.train(trainingData, boostingStrategy); + GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy); // Evaluate model on test instances and compute test error JavaPairRDD<Double, Double> predictionAndLabel = - testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testMSE = - predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() { - @Override - public Double call(Tuple2<Double, Double> pl) { - Double diff = pl._1() - pl._2(); - return diff * diff; - } - }).reduce(new Function2<Double, Double, Double>() { - @Override - public Double call(Double a, Double b) { - return a + b; - } - }) / data.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testMSE = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression GBT model:\n" + model.toDebugString()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java index a30b5f1..adebafe 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java @@ -20,9 +20,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import scala.Tuple3; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; @@ -42,14 +39,8 @@ public class JavaIsotonicRegressionExample { jsc.sc(), "data/mllib/sample_isotonic_regression_libsvm_data.txt").toJavaRDD(); // Create label, feature, weight tuples from input data with weight set to default value 1.0. - JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map( - new Function<LabeledPoint, Tuple3<Double, Double, Double>>() { - public Tuple3<Double, Double, Double> call(LabeledPoint point) { - return new Tuple3<>(new Double(point.label()), - new Double(point.features().apply(0)), 1.0); - } - } - ); + JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(point -> + new Tuple3<>(point.label(), point.features().apply(0), 1.0)); // Split data into training (60%) and test (40%) sets. JavaRDD<Tuple3<Double, Double, Double>>[] splits = @@ -59,29 +50,17 @@ public class JavaIsotonicRegressionExample { // Create isotonic regression model from training data. // Isotonic parameter defaults to true so it is only shown for demonstration - final IsotonicRegressionModel model = - new IsotonicRegression().setIsotonic(true).run(training); + IsotonicRegressionModel model = new IsotonicRegression().setIsotonic(true).run(training); // Create tuples of predicted and real labels. - JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair( - new PairFunction<Tuple3<Double, Double, Double>, Double, Double>() { - @Override - public Tuple2<Double, Double> call(Tuple3<Double, Double, Double> point) { - Double predictedLabel = model.predict(point._2()); - return new Tuple2<>(predictedLabel, point._1()); - } - } - ); + JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(point -> + new Tuple2<>(model.predict(point._2()), point._1())); // Calculate mean squared error between predicted and real labels. - Double meanSquaredError = new JavaDoubleRDD(predictionAndLabel.map( - new Function<Tuple2<Double, Double>, Object>() { - @Override - public Object call(Tuple2<Double, Double> pl) { - return Math.pow(pl._1() - pl._2(), 2); - } - } - ).rdd()).mean(); + double meanSquaredError = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Mean Squared Error = " + meanSquaredError); // Save and load model http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java index 2d89c76..f172756 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java @@ -22,7 +22,6 @@ import org.apache.spark.api.java.JavaSparkContext; // $example on$ import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; import org.apache.spark.mllib.linalg.Vector; @@ -39,18 +38,14 @@ public class JavaKMeansExample { // Load and parse data String path = "data/mllib/kmeans_data.txt"; JavaRDD<String> data = jsc.textFile(path); - JavaRDD<Vector> parsedData = data.map( - new Function<String, Vector>() { - public Vector call(String s) { - String[] sarray = s.split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) { - values[i] = Double.parseDouble(sarray[i]); - } - return Vectors.dense(values); - } + JavaRDD<Vector> parsedData = data.map(s -> { + String[] sarray = s.split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) { + values[i] = Double.parseDouble(sarray[i]); } - ); + return Vectors.dense(values); + }); parsedData.cache(); // Cluster the data into two classes using KMeans http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java index f6f91f4..3fdc03a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java @@ -23,7 +23,6 @@ import java.util.Arrays; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; import org.apache.spark.mllib.linalg.Vector; @@ -50,12 +49,8 @@ public class JavaLBFGSExample { JavaRDD<LabeledPoint> test = data.subtract(trainingInit); // Append 1 into the training data as intercept. - JavaRDD<Tuple2<Object, Vector>> training = data.map( - new Function<LabeledPoint, Tuple2<Object, Vector>>() { - public Tuple2<Object, Vector> call(LabeledPoint p) { - return new Tuple2<Object, Vector>(p.label(), MLUtils.appendBias(p.features())); - } - }); + JavaPairRDD<Object, Vector> training = data.mapToPair(p -> + new Tuple2<>(p.label(), MLUtils.appendBias(p.features()))); training.cache(); // Run training algorithm to build the model. @@ -77,7 +72,7 @@ public class JavaLBFGSExample { Vector weightsWithIntercept = result._1(); double[] loss = result._2(); - final LogisticRegressionModel model = new LogisticRegressionModel( + LogisticRegressionModel model = new LogisticRegressionModel( Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)), (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]); @@ -85,13 +80,8 @@ public class JavaLBFGSExample { model.clearThreshold(); // Compute raw scores on the test set. - JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map( - new Function<LabeledPoint, Tuple2<Object, Object>>() { - public Tuple2<Object, Object> call(LabeledPoint p) { - Double score = model.predict(p.features()); - return new Tuple2<Object, Object>(score, p.label()); - } - }); + JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. BinaryClassificationMetrics metrics = @@ -99,8 +89,9 @@ public class JavaLBFGSExample { double auROC = metrics.areaUnderROC(); System.out.println("Loss of each step in training process"); - for (double l : loss) + for (double l : loss) { System.out.println(l); + } System.out.println("Area under ROC = " + auROC); // $example off$ http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java index 578564e..887edf8 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java @@ -25,7 +25,6 @@ import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.clustering.DistributedLDAModel; import org.apache.spark.mllib.clustering.LDA; import org.apache.spark.mllib.clustering.LDAModel; @@ -44,28 +43,17 @@ public class JavaLatentDirichletAllocationExample { // Load and parse the data String path = "data/mllib/sample_lda_data.txt"; JavaRDD<String> data = jsc.textFile(path); - JavaRDD<Vector> parsedData = data.map( - new Function<String, Vector>() { - public Vector call(String s) { - String[] sarray = s.trim().split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) { - values[i] = Double.parseDouble(sarray[i]); - } - return Vectors.dense(values); - } + JavaRDD<Vector> parsedData = data.map(s -> { + String[] sarray = s.trim().split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) { + values[i] = Double.parseDouble(sarray[i]); } - ); + return Vectors.dense(values); + }); // Index documents with unique IDs JavaPairRDD<Long, Vector> corpus = - JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( - new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() { - public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) { - return doc_id.swap(); - } - } - ) - ); + JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap)); corpus.cache(); // Cluster the documents into three topics using LDA http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java index 9ca9a78..324a781 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java @@ -23,9 +23,8 @@ import org.apache.spark.api.java.JavaSparkContext; // $example on$ import scala.Tuple2; -import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; @@ -44,43 +43,31 @@ public class JavaLinearRegressionWithSGDExample { // Load and parse the data String path = "data/mllib/ridge-data/lpsa.data"; JavaRDD<String> data = sc.textFile(path); - JavaRDD<LabeledPoint> parsedData = data.map( - new Function<String, LabeledPoint>() { - public LabeledPoint call(String line) { - String[] parts = line.split(","); - String[] features = parts[1].split(" "); - double[] v = new double[features.length]; - for (int i = 0; i < features.length - 1; i++) { - v[i] = Double.parseDouble(features[i]); - } - return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); - } + JavaRDD<LabeledPoint> parsedData = data.map(line -> { + String[] parts = line.split(","); + String[] features = parts[1].split(" "); + double[] v = new double[features.length]; + for (int i = 0; i < features.length - 1; i++) { + v[i] = Double.parseDouble(features[i]); } - ); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + }); parsedData.cache(); // Building the model int numIterations = 100; double stepSize = 0.00000001; - final LinearRegressionModel model = + LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize); // Evaluate model on training examples and compute training error - JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map( - new Function<LabeledPoint, Tuple2<Double, Double>>() { - public Tuple2<Double, Double> call(LabeledPoint point) { - double prediction = model.predict(point.features()); - return new Tuple2<>(prediction, point.label()); - } - } - ); - double MSE = new JavaDoubleRDD(valuesAndPreds.map( - new Function<Tuple2<Double, Double>, Object>() { - public Object call(Tuple2<Double, Double> pair) { - return Math.pow(pair._1() - pair._2(), 2.0); - } - } - ).rdd()).mean(); + JavaPairRDD<Double, Double> valuesAndPreds = parsedData.mapToPair(point -> + new Tuple2<>(model.predict(point.features()), point.label())); + + double MSE = valuesAndPreds.mapToDouble(pair -> { + double diff = pair._1() - pair._2(); + return diff * diff; + }).mean(); System.out.println("training Mean Squared Error = " + MSE); // Save and load model http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java index 7fc371e..26b8a6e 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java @@ -23,8 +23,8 @@ import org.apache.spark.SparkContext; // $example on$ import scala.Tuple2; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; @@ -49,19 +49,13 @@ public class JavaLogisticRegressionWithLBFGSExample { JavaRDD<LabeledPoint> test = splits[1]; // Run training algorithm to build the model. - final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() + LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training.rdd()); // Compute raw scores on the test set. - JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map( - new Function<LabeledPoint, Tuple2<Object, Object>>() { - public Tuple2<Object, Object> call(LabeledPoint p) { - Double prediction = model.predict(p.features()); - return new Tuple2<Object, Object>(prediction, p.label()); - } - } - ); + JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java index 2d12bdd..0367038 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; import org.apache.spark.mllib.evaluation.MulticlassMetrics; @@ -46,19 +45,13 @@ public class JavaMulticlassClassificationMetricsExample { JavaRDD<LabeledPoint> test = splits[1]; // Run training algorithm to build the model. - final LogisticRegressionModel model = new LogisticRegressionWithLBFGS() + LogisticRegressionModel model = new LogisticRegressionWithLBFGS() .setNumClasses(3) .run(training.rdd()); // Compute raw scores on the test set. - JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map( - new Function<LabeledPoint, Tuple2<Object, Object>>() { - public Tuple2<Object, Object> call(LabeledPoint p) { - Double prediction = model.predict(p.features()); - return new Tuple2<Object, Object>(prediction, p.label()); - } - } - ); + JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java index f4ec04b..d80dbe8 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaNaiveBayesExample.java @@ -19,8 +19,6 @@ package org.apache.spark.examples.mllib; // $example on$ import scala.Tuple2; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -41,20 +39,11 @@ public class JavaNaiveBayesExample { JavaRDD<LabeledPoint>[] tmp = inputData.randomSplit(new double[]{0.6, 0.4}); JavaRDD<LabeledPoint> training = tmp[0]; // training set JavaRDD<LabeledPoint> test = tmp[1]; // test set - final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0); + NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0); JavaPairRDD<Double, Double> predictionAndLabel = - test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - double accuracy = predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() { - @Override - public Boolean call(Tuple2<Double, Double> pl) { - return pl._1().equals(pl._2()); - } - }).count() / (double) test.count(); + test.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double accuracy = + predictionAndLabel.filter(pl -> pl._1().equals(pl._2())).count() / (double) test.count(); // Save and load model model.save(jsc.sc(), "target/tmp/myNaiveBayesModel"); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java index 91c3bd7..5155f18 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java @@ -17,9 +17,9 @@ package org.apache.spark.examples.mllib; -import scala.Tuple3; +import java.util.Arrays; -import com.google.common.collect.Lists; +import scala.Tuple3; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -39,7 +39,7 @@ public class JavaPowerIterationClusteringExample { @SuppressWarnings("unchecked") // $example on$ - JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList( + JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList( new Tuple3<>(0L, 1L, 0.9), new Tuple3<>(1L, 2L, 0.9), new Tuple3<>(2L, 3L, 0.9), http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java index 24af5d0..6998ce2 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestClassificationExample.java @@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib; // $example on$ import java.util.HashMap; +import java.util.Map; import scala.Tuple2; @@ -26,8 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.RandomForest; import org.apache.spark.mllib.tree.model.RandomForestModel; @@ -50,7 +49,7 @@ public class JavaRandomForestClassificationExample { // Train a RandomForest model. // Empty categoricalFeaturesInfo indicates all features are continuous. Integer numClasses = 2; - HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); + Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); Integer numTrees = 3; // Use more in practice. String featureSubsetStrategy = "auto"; // Let the algorithm choose. String impurity = "gini"; @@ -58,25 +57,15 @@ public class JavaRandomForestClassificationExample { Integer maxBins = 32; Integer seed = 12345; - final RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses, + RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed); // Evaluate model on test instances and compute test error JavaPairRDD<Double, Double> predictionAndLabel = - testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testErr = - 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() { - @Override - public Boolean call(Tuple2<Double, Double> pl) { - return !pl._1().equals(pl._2()); - } - }).count() / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testErr = + predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count(); System.out.println("Test Error: " + testErr); System.out.println("Learned classification forest model:\n" + model.toDebugString()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java index afa9045..4a0f55f 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRandomForestRegressionExample.java @@ -23,12 +23,9 @@ import java.util.Map; import scala.Tuple2; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.RandomForest; import org.apache.spark.mllib.tree.model.RandomForestModel; @@ -52,37 +49,23 @@ public class JavaRandomForestRegressionExample { // Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); - Integer numTrees = 3; // Use more in practice. + int numTrees = 3; // Use more in practice. String featureSubsetStrategy = "auto"; // Let the algorithm choose. String impurity = "variance"; - Integer maxDepth = 4; - Integer maxBins = 32; - Integer seed = 12345; + int maxDepth = 4; + int maxBins = 32; + int seed = 12345; // Train a RandomForest model. - final RandomForestModel model = RandomForest.trainRegressor(trainingData, + RandomForestModel model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed); // Evaluate model on test instances and compute test error JavaPairRDD<Double, Double> predictionAndLabel = - testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() { - @Override - public Tuple2<Double, Double> call(LabeledPoint p) { - return new Tuple2<>(model.predict(p.features()), p.label()); - } - }); - Double testMSE = - predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() { - @Override - public Double call(Tuple2<Double, Double> pl) { - Double diff = pl._1() - pl._2(); - return diff * diff; - } - }).reduce(new Function2<Double, Double, Double>() { - @Override - public Double call(Double a, Double b) { - return a + b; - } - }) / testData.count(); + testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label())); + double testMSE = predictionAndLabel.mapToDouble(pl -> { + double diff = pl._1() - pl._2(); + return diff * diff; + }).mean(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression forest model:\n" + model.toDebugString()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java index 54dfc40..bd49f05 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java @@ -23,7 +23,6 @@ import java.util.*; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.evaluation.RegressionMetrics; import org.apache.spark.mllib.evaluation.RankingMetrics; import org.apache.spark.mllib.recommendation.ALS; @@ -39,93 +38,61 @@ public class JavaRankingMetricsExample { // $example on$ String path = "data/mllib/sample_movielens_data.txt"; JavaRDD<String> data = sc.textFile(path); - JavaRDD<Rating> ratings = data.map( - new Function<String, Rating>() { - @Override - public Rating call(String line) { - String[] parts = line.split("::"); - return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double - .parseDouble(parts[2]) - 2.5); - } - } - ); + JavaRDD<Rating> ratings = data.map(line -> { + String[] parts = line.split("::"); + return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double + .parseDouble(parts[2]) - 2.5); + }); ratings.cache(); // Train an ALS model - final MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01); + MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01); // Get top 10 recommendations for every user and scale ratings from 0 to 1 JavaRDD<Tuple2<Object, Rating[]>> userRecs = model.recommendProductsForUsers(10).toJavaRDD(); - JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map( - new Function<Tuple2<Object, Rating[]>, Tuple2<Object, Rating[]>>() { - @Override - public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) { - Rating[] scaledRatings = new Rating[t._2().length]; - for (int i = 0; i < scaledRatings.length; i++) { - double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0); - scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating); - } - return new Tuple2<>(t._1(), scaledRatings); + JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(t -> { + Rating[] scaledRatings = new Rating[t._2().length]; + for (int i = 0; i < scaledRatings.length; i++) { + double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0); + scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating); } - } - ); + return new Tuple2<>(t._1(), scaledRatings); + }); JavaPairRDD<Object, Rating[]> userRecommended = JavaPairRDD.fromJavaRDD(userRecsScaled); // Map ratings to 1 or 0, 1 indicating a movie that should be recommended - JavaRDD<Rating> binarizedRatings = ratings.map( - new Function<Rating, Rating>() { - @Override - public Rating call(Rating r) { - double binaryRating; - if (r.rating() > 0.0) { - binaryRating = 1.0; - } else { - binaryRating = 0.0; - } - return new Rating(r.user(), r.product(), binaryRating); + JavaRDD<Rating> binarizedRatings = ratings.map(r -> { + double binaryRating; + if (r.rating() > 0.0) { + binaryRating = 1.0; + } else { + binaryRating = 0.0; } - } - ); + return new Rating(r.user(), r.product(), binaryRating); + }); // Group ratings by common user - JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy( - new Function<Rating, Object>() { - @Override - public Object call(Rating r) { - return r.user(); - } - } - ); + JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(Rating::user); // Get true relevant documents from all user ratings - JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues( - new Function<Iterable<Rating>, List<Integer>>() { - @Override - public List<Integer> call(Iterable<Rating> docs) { - List<Integer> products = new ArrayList<>(); - for (Rating r : docs) { - if (r.rating() > 0.0) { - products.add(r.product()); - } + JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(docs -> { + List<Integer> products = new ArrayList<>(); + for (Rating r : docs) { + if (r.rating() > 0.0) { + products.add(r.product()); } - return products; } - } - ); + return products; + }); // Extract the product id from each recommendation - JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues( - new Function<Rating[], List<Integer>>() { - @Override - public List<Integer> call(Rating[] docs) { - List<Integer> products = new ArrayList<>(); - for (Rating r : docs) { - products.add(r.product()); - } - return products; + JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(docs -> { + List<Integer> products = new ArrayList<>(); + for (Rating r : docs) { + products.add(r.product()); } - } - ); + return products; + }); JavaRDD<Tuple2<List<Integer>, List<Integer>>> relevantDocs = userMoviesList.join( userRecommendedList).values(); @@ -143,33 +110,15 @@ public class JavaRankingMetricsExample { System.out.format("Mean average precision = %f\n", metrics.meanAveragePrecision()); // Evaluate the model using numerical ratings and regression metrics - JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map( - new Function<Rating, Tuple2<Object, Object>>() { - @Override - public Tuple2<Object, Object> call(Rating r) { - return new Tuple2<Object, Object>(r.user(), r.product()); - } - } - ); + JavaRDD<Tuple2<Object, Object>> userProducts = + ratings.map(r -> new Tuple2<>(r.user(), r.product())); + JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = JavaPairRDD.fromJavaRDD( - model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( - new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() { - @Override - public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) { - return new Tuple2<Tuple2<Integer, Integer>, Object>( - new Tuple2<>(r.user(), r.product()), r.rating()); - } - } - )); + model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(r -> + new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))); JavaRDD<Tuple2<Object, Object>> ratesAndPreds = - JavaPairRDD.fromJavaRDD(ratings.map( - new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() { - @Override - public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) { - return new Tuple2<Tuple2<Integer, Integer>, Object>( - new Tuple2<>(r.user(), r.product()), r.rating()); - } - } + JavaPairRDD.fromJavaRDD(ratings.map(r -> + new Tuple2<Tuple2<Integer, Integer>, Object>(new Tuple2<>(r.user(), r.product()), r.rating()) )).join(predictions).values(); // Create regression metrics object --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
