Repository: spark Updated Branches: refs/heads/master ba8912e5f -> de14d35f7
http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java index f69aa4b..1ee68da 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; @@ -37,15 +36,12 @@ public class JavaRecommendationExample { // Load and parse the data String path = "data/mllib/als/test.data"; JavaRDD<String> data = jsc.textFile(path); - JavaRDD<Rating> ratings = data.map( - new Function<String, Rating>() { - public Rating call(String s) { - String[] sarray = s.split(","); - return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), - Double.parseDouble(sarray[2])); - } - } - ); + JavaRDD<Rating> ratings = data.map(s -> { + String[] sarray = s.split(","); + return new Rating(Integer.parseInt(sarray[0]), + Integer.parseInt(sarray[1]), + Double.parseDouble(sarray[2])); + }); // Build the recommendation model using ALS int rank = 10; @@ -53,37 +49,19 @@ public class JavaRecommendationExample { MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); // Evaluate the model on rating data - JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map( - new Function<Rating, Tuple2<Object, Object>>() { - public Tuple2<Object, Object> call(Rating r) { - return new Tuple2<Object, Object>(r.user(), r.product()); - } - } - ); + JavaRDD<Tuple2<Object, Object>> userProducts = + ratings.map(r -> new Tuple2<>(r.user(), r.product())); JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD( - model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( - new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { - public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){ - return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()); - } - } - )); - JavaRDD<Tuple2<Double, Double>> ratesAndPreds = - JavaPairRDD.fromJavaRDD(ratings.map( - new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { - public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){ - return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()); - } - } - )).join(predictions).values(); - double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map( - new Function<Tuple2<Double, Double>, Object>() { - public Object call(Tuple2<Double, Double> pair) { - Double err = pair._1() - pair._2(); - return err * err; - } - } - ).rdd()).mean(); + model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD() + .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())) + ); + JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD( + ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))) + .join(predictions).values(); + double MSE = ratesAndPreds.mapToDouble(pair -> { + double err = pair._1() - pair._2(); + return err * err; + }).mean(); System.out.println("Mean Squared Error = " + MSE); // Save and load model http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java index b3e5c04..7bb9993 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java @@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib; import scala.Tuple2; import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; @@ -38,34 +37,24 @@ public class JavaRegressionMetricsExample { // Load and parse the data String path = "data/mllib/sample_linear_regression_data.txt"; JavaRDD<String> data = sc.textFile(path); - JavaRDD<LabeledPoint> parsedData = data.map( - new Function<String, LabeledPoint>() { - public LabeledPoint call(String line) { - String[] parts = line.split(" "); - double[] v = new double[parts.length - 1]; - for (int i = 1; i < parts.length - 1; i++) { - v[i - 1] = Double.parseDouble(parts[i].split(":")[1]); - } - return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); - } + JavaRDD<LabeledPoint> parsedData = data.map(line -> { + String[] parts = line.split(" "); + double[] v = new double[parts.length - 1]; + for (int i = 1; i < parts.length - 1; i++) { + v[i - 1] = Double.parseDouble(parts[i].split(":")[1]); } - ); + return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); + }); parsedData.cache(); // Building the model int numIterations = 100; - final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), + LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); // Evaluate model on training examples and compute training error - JavaRDD<Tuple2<Object, Object>> valuesAndPreds = parsedData.map( - new Function<LabeledPoint, Tuple2<Object, Object>>() { - public Tuple2<Object, Object> call(LabeledPoint point) { - double prediction = model.predict(point.features()); - return new Tuple2<Object, Object>(prediction, point.label()); - } - } - ); + JavaPairRDD<Object, Object> valuesAndPreds = parsedData.mapToPair(point -> + new Tuple2<>(model.predict(point.features()), point.label())); // Instantiate metrics object RegressionMetrics metrics = new RegressionMetrics(valuesAndPreds.rdd()); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java index 720b167..866a221 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java @@ -24,7 +24,6 @@ import org.apache.spark.SparkContext; import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.SVMModel; import org.apache.spark.mllib.classification.SVMWithSGD; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; @@ -50,20 +49,14 @@ public class JavaSVMWithSGDExample { // Run training algorithm to build the model. int numIterations = 100; - final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); + SVMModel model = SVMWithSGD.train(training.rdd(), numIterations); // Clear the default threshold. model.clearThreshold(); // Compute raw scores on the test set. - JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map( - new Function<LabeledPoint, Tuple2<Object, Object>>() { - public Tuple2<Object, Object> call(LabeledPoint p) { - Double score = model.predict(p.features()); - return new Tuple2<Object, Object>(score, p.label()); - } - } - ); + JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p -> + new Tuple2<>(model.predict(p.features()), p.label())); // Get evaluation metrics. BinaryClassificationMetrics metrics = http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java index 7f4fe60..f9198e7 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java @@ -23,9 +23,6 @@ import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -// $example off$ -import org.apache.spark.api.java.function.Function; -// $example on$ import org.apache.spark.mllib.fpm.AssociationRules; import org.apache.spark.mllib.fpm.FPGrowth; import org.apache.spark.mllib.fpm.FPGrowthModel; @@ -42,14 +39,7 @@ public class JavaSimpleFPGrowth { // $example on$ JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt"); - JavaRDD<List<String>> transactions = data.map( - new Function<String, List<String>>() { - public List<String> call(String line) { - String[] parts = line.split(" "); - return Arrays.asList(parts); - } - } - ); + JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" "))); FPGrowth fpg = new FPGrowth() .setMinSupport(0.2) http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index cfaa577..4be702c 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -17,10 +17,6 @@ package org.apache.spark.examples.mllib; - -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; // $example on$ import org.apache.spark.mllib.stat.test.BinarySample; import org.apache.spark.mllib.stat.test.StreamingTest; @@ -75,16 +71,12 @@ public class JavaStreamingTestExample { ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString()); // $example on$ - JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map( - new Function<String, BinarySample>() { - @Override - public BinarySample call(String line) { - String[] ts = line.split(","); - boolean label = Boolean.parseBoolean(ts[0]); - double value = Double.parseDouble(ts[1]); - return new BinarySample(label, value); - } - }); + JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> { + String[] ts = line.split(","); + boolean label = Boolean.parseBoolean(ts[0]); + double value = Double.parseDouble(ts[1]); + return new BinarySample(label, value); + }); StreamingTest streamingTest = new StreamingTest() .setPeacePeriod(0) @@ -98,21 +90,11 @@ public class JavaStreamingTestExample { // Stop processing if test becomes significant or we time out timeoutCounter = numBatchesTimeout; - out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() { - @Override - public void call(JavaRDD<StreamingTestResult> rdd) { - timeoutCounter -= 1; - - boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() { - @Override - public Boolean call(StreamingTestResult v) { - return v.pValue() < 0.05; - } - }).isEmpty(); - - if (timeoutCounter <= 0 || anySignificant) { - rdd.context().stop(); - } + out.foreachRDD(rdd -> { + timeoutCounter -= 1; + boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty(); + if (timeoutCounter <= 0 || anySignificant) { + rdd.context().stop(); } }); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index b687fae..adb96dd 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -139,11 +139,9 @@ public class JavaSQLDataSourceExample { // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); - Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() { - public String call(Row row) { - return "Name: " + row.getString(0); - } - }, Encoders.STRING()); + Dataset<String> namesDS = namesDF.map( + (MapFunction<Row, String>) row -> "Name: " + row.getString(0), + Encoders.STRING()); namesDS.show(); // +------------+ // | value| http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java index c5770d1..8605852 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java @@ -227,12 +227,9 @@ public class JavaSparkSQLExample { // Encoders for most common types are provided in class Encoders Encoder<Integer> integerEncoder = Encoders.INT(); Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); - Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() { - @Override - public Integer call(Integer value) throws Exception { - return value + 1; - } - }, integerEncoder); + Dataset<Integer> transformedDS = primitiveDS.map( + (MapFunction<Integer, Integer>) value -> value + 1, + integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on name @@ -255,15 +252,12 @@ public class JavaSparkSQLExample { JavaRDD<Person> peopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() - .map(new Function<String, Person>() { - @Override - public Person call(String line) throws Exception { - String[] parts = line.split(","); - Person person = new Person(); - person.setName(parts[0]); - person.setAge(Integer.parseInt(parts[1].trim())); - return person; - } + .map(line -> { + String[] parts = line.split(","); + Person person = new Person(); + person.setName(parts[0]); + person.setAge(Integer.parseInt(parts[1].trim())); + return person; }); // Apply a schema to an RDD of JavaBeans to get a DataFrame @@ -276,12 +270,9 @@ public class JavaSparkSQLExample { // The columns of a row in the result can be accessed by field index Encoder<String> stringEncoder = Encoders.STRING(); - Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getString(0); - } - }, stringEncoder); + Dataset<String> teenagerNamesByIndexDF = teenagersDF.map( + (MapFunction<Row, String>) row -> "Name: " + row.getString(0), + stringEncoder); teenagerNamesByIndexDF.show(); // +------------+ // | value| @@ -290,12 +281,9 @@ public class JavaSparkSQLExample { // +------------+ // or by field name - Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.<String>getAs("name"); - } - }, stringEncoder); + Dataset<String> teenagerNamesByFieldDF = teenagersDF.map( + (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"), + stringEncoder); teenagerNamesByFieldDF.show(); // +------------+ // | value| @@ -324,12 +312,9 @@ public class JavaSparkSQLExample { StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows - JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() { - @Override - public Row call(String record) throws Exception { - String[] attributes = record.split(","); - return RowFactory.create(attributes[0], attributes[1].trim()); - } + JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> { + String[] attributes = record.split(","); + return RowFactory.create(attributes[0], attributes[1].trim()); }); // Apply the schema to the RDD @@ -343,12 +328,9 @@ public class JavaSparkSQLExample { // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name - Dataset<String> namesDS = results.map(new MapFunction<Row, String>() { - @Override - public String call(Row row) throws Exception { - return "Name: " + row.getString(0); - } - }, Encoders.STRING()); + Dataset<String> namesDS = results.map( + (MapFunction<Row, String>) row -> "Name: " + row.getString(0), + Encoders.STRING()); namesDS.show(); // +-------------+ // | value| http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java index 2fe1307..4763856 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java @@ -90,12 +90,9 @@ public class JavaSparkHiveExample { Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DaraFrames are of type Row, which lets you to access each column by ordinal. - Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() { - @Override - public String call(Row row) throws Exception { - return "Key: " + row.get(0) + ", Value: " + row.get(1); - } - }, Encoders.STRING()); + Dataset<String> stringsDS = sqlDF.map( + (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1), + Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java index 0f45cfe..4e02719 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java @@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; -import java.util.Iterator; /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -78,12 +77,9 @@ public final class JavaStructuredKafkaWordCount { .as(Encoders.STRING()); // Generate running word count - Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()).groupBy("value").count(); + Dataset<Row> wordCounts = lines.flatMap( + (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), + Encoders.STRING()).groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 5f342e1..3af7869 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -21,7 +21,6 @@ import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; -import java.util.Iterator; /** * Counts words in UTF8 encoded, '\n' delimited text received from the network. @@ -61,13 +60,9 @@ public final class JavaStructuredNetworkWordCount { .load(); // Split the lines into words - Dataset<String> words = lines.as(Encoders.STRING()) - .flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()); + Dataset<String> words = lines.as(Encoders.STRING()).flatMap( + (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), + Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java index 172d053..93ec5e2 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -18,13 +18,11 @@ package org.apache.spark.examples.sql.streaming; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.functions; import org.apache.spark.sql.streaming.StreamingQuery; import scala.Tuple2; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; /** @@ -86,16 +84,12 @@ public final class JavaStructuredNetworkWordCountWindowed { // Split the lines into words, retaining timestamps Dataset<Row> words = lines .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) - .flatMap( - new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { - @Override - public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { - List<Tuple2<String, Timestamp>> result = new ArrayList<>(); - for (String word : t._1.split(" ")) { - result.add(new Tuple2<>(word, t._2)); - } - return result.iterator(); + .flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t -> { + List<Tuple2<String, Timestamp>> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); } + return result.iterator(); }, Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) ).toDF("word", "timestamp"); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index e20b94d..47692ec 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -20,9 +20,6 @@ package org.apache.spark.examples.streaming; import com.google.common.io.Closeables; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; @@ -38,7 +35,6 @@ import java.net.ConnectException; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; /** @@ -74,23 +70,9 @@ public class JavaCustomReceiver extends Receiver<String> { // words in input stream of \n delimited text (eg. generated by 'nc') JavaReceiverInputDStream<String> lines = ssc.receiverStream( new JavaCustomReceiver(args[0], Integer.parseInt(args[1]))); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); ssc.start(); @@ -108,15 +90,13 @@ public class JavaCustomReceiver extends Receiver<String> { port = port_; } + @Override public void onStart() { // Start the thread that receives data over a connection - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } + @Override public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false @@ -127,13 +107,13 @@ public class JavaCustomReceiver extends Receiver<String> { try { Socket socket = null; BufferedReader reader = null; - String userInput = null; try { // connect to the server socket = new Socket(host, port); reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading + String userInput; while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); store(userInput); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index ed118f8..5e5ae62 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming; import java.util.HashMap; import java.util.HashSet; import java.util.Arrays; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; @@ -30,7 +29,6 @@ import scala.Tuple2; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.Durations; @@ -82,31 +80,10 @@ public final class JavaDirectKafkaWordCount { ); // Get the lines, split them into words, count the words and print - JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> tuple2) { - return tuple2._2(); - } - }); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream<String> lines = messages.map(Tuple2::_2); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); // Start the computation http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java index 33c0a2d..0c65104 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.streaming; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.flume.FlumeUtils; @@ -62,12 +61,7 @@ public final class JavaFlumeEventCount { flumeStream.count(); - flumeStream.count().map(new Function<Long, String>() { - @Override - public String call(Long in) { - return "Received " + in + " flume events."; - } - }).print(); + flumeStream.count().map(in -> "Received " + in + " flume events.").print(); ssc.start(); ssc.awaitTermination(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index 8a5fd53..ce5acdc 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; @@ -26,10 +25,6 @@ import java.util.regex.Pattern; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -78,32 +73,12 @@ public final class JavaKafkaWordCount { JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); - JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> tuple2) { - return tuple2._2(); - } - }); + JavaDStream<String> lines = messages.map(Tuple2::_2); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); jssc.start(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 7a8fe99..b217672 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -18,15 +18,11 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; @@ -66,24 +62,9 @@ public final class JavaNetworkWordCount { // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); ssc.start(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java index 62413b4..e86f8ab 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java @@ -17,19 +17,15 @@ package org.apache.spark.examples.streaming; - +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import scala.Tuple2; -import com.google.common.collect.Lists; - import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -49,14 +45,14 @@ public final class JavaQueueStream { // Create the queue through which RDDs can be pushed to // a QueueInputDStream - Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>(); // Create and push some RDDs into the queue - List<Integer> list = Lists.newArrayList(); + List<Integer> list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { list.add(i); } + Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>(); for (int i = 0; i < 30; i++) { rddQueue.add(ssc.sparkContext().parallelize(list)); } @@ -64,19 +60,9 @@ public final class JavaQueueStream { // Create the QueueInputDStream and use it do some processing JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue); JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i % 10, 1); - } - }); + i -> new Tuple2<>(i % 10, 1)); JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + (i1, i2) -> i1 + i2); reducedStream.print(); ssc.start(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index acbc345..45a876d 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -18,10 +18,8 @@ package org.apache.spark.examples.streaming; import java.io.File; -import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -30,12 +28,10 @@ import scala.Tuple2; import com.google.common.io.Files; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; @@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint System.out.println("Creating new context"); - final File outputFile = new File(outputPath); + File outputFile = new File(outputPath); if (outputFile.exists()) { outputFile.delete(); } @@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount { // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); + + wordCounts.foreachRDD((rdd, time) -> { + // Get or register the blacklist Broadcast + Broadcast<List<String>> blacklist = + JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + LongAccumulator droppedWordsCounter = + JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(wordCount -> { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; } - }); - - wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() { - @Override - public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException { - // Get or register the blacklist Broadcast - final Broadcast<List<String>> blacklist = - JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); - // Get or register the droppedWordsCounter Accumulator - final LongAccumulator droppedWordsCounter = - JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); - // Use blacklist to drop words and use droppedWordsCounter to count them - String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { - @Override - public Boolean call(Tuple2<String, Integer> wordCount) { - if (blacklist.value().contains(wordCount._1())) { - droppedWordsCounter.add(wordCount._2()); - return false; - } else { - return true; - } - } - }).collect().toString(); - String output = "Counts at time " + time + " " + counts; - System.out.println(output); - System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); - System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(output + "\n", outputFile, Charset.defaultCharset()); - } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + System.out.println(output); + System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); + System.out.println("Appending to " + outputFile.getAbsolutePath()); + Files.append(output + "\n", outputFile, Charset.defaultCharset()); }); return ssc; @@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount { System.exit(1); } - final String ip = args[0]; - final int port = Integer.parseInt(args[1]); - final String checkpointDirectory = args[2]; - final String outputPath = args[3]; + String ip = args[0]; + int port = Integer.parseInt(args[1]); + String checkpointDirectory = args[2]; + String outputPath = args[3]; // Function to create JavaStreamingContext without any output operations // (used to detect the new context) - Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() { - @Override - public JavaStreamingContext call() { - return createContext(ip, port, checkpointDirectory, outputPath); - } - }; + Function0<JavaStreamingContext> createContextFunc = + () -> createContext(ip, port, checkpointDirectory, outputPath); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java index b8e9e12..948d1a2 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java @@ -18,20 +18,15 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -48,7 +43,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999` */ - public final class JavaSqlNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); @@ -70,39 +64,28 @@ public final class JavaSqlNetworkWordCount { // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); // Convert RDDs of the words DStream to DataFrame and run SQL query - words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { - @Override - public void call(JavaRDD<String> rdd, Time time) { - SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); + words.foreachRDD((rdd, time) -> { + SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); - // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame - JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() { - @Override - public JavaRecord call(String word) { - JavaRecord record = new JavaRecord(); - record.setWord(word); - return record; - } - }); - Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); + // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame + JavaRDD<JavaRecord> rowRDD = rdd.map(word -> { + JavaRecord record = new JavaRecord(); + record.setWord(word); + return record; + }); + Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); - // Creates a temporary view using the DataFrame - wordsDataFrame.createOrReplaceTempView("words"); + // Creates a temporary view using the DataFrame + wordsDataFrame.createOrReplaceTempView("words"); - // Do word count on table using SQL and print it - Dataset<Row> wordCountsDataFrame = - spark.sql("select word, count(*) as total from words group by word"); - System.out.println("========= " + time + "========="); - wordCountsDataFrame.show(); - } + // Do word count on table using SQL and print it + Dataset<Row> wordCountsDataFrame = + spark.sql("select word, count(*) as total from words group by word"); + System.out.println("========= " + time + "========="); + wordCountsDataFrame.show(); }); ssc.start(); http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index ed36df8..9d8bd7f 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -72,32 +71,17 @@ public class JavaStatefulNetworkWordCount { JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); - JavaPairDStream<String, Integer> wordsDstream = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }); + JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(s -> new Tuple2<>(s, 1)); // Update the cumulative count function Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = - new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> call(String word, Optional<Integer> one, - State<Integer> state) { - int sum = one.orElse(0) + (state.exists() ? state.get() : 0); - Tuple2<String, Integer> output = new Tuple2<>(word, sum); - state.update(sum); - return output; - } + (word, one, state) -> { + int sum = one.orElse(0) + (state.exists() ? state.get() : 0); + Tuple2<String, Integer> output = new Tuple2<>(word, sum); + state.update(sum); + return output; }; // DStream made of get cumulative counts that get updated in every batch --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
