Repository: spark Updated Branches: refs/heads/master de14d35f7 -> 1487c9af2
http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index a94a37c..577672c 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -96,12 +96,7 @@ public class JavaDatasetSuite implements Serializable { @Test public void testTypedFilterPreservingSchema() { Dataset<Long> ds = spark.range(10); - Dataset<Long> ds2 = ds.filter(new FilterFunction<Long>() { - @Override - public boolean call(Long value) throws Exception { - return value > 3; - } - }); + Dataset<Long> ds2 = ds.filter((FilterFunction<Long>) value -> value > 3); Assert.assertEquals(ds.schema(), ds2.schema()); } @@ -111,44 +106,28 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> ds = spark.createDataset(data, Encoders.STRING()); Assert.assertEquals("hello", ds.first()); - Dataset<String> filtered = ds.filter(new FilterFunction<String>() { - @Override - public boolean call(String v) throws Exception { - return v.startsWith("h"); - } - }); + Dataset<String> filtered = ds.filter((FilterFunction<String>) v -> v.startsWith("h")); Assert.assertEquals(Arrays.asList("hello"), filtered.collectAsList()); - Dataset<Integer> mapped = ds.map(new MapFunction<String, Integer>() { - @Override - public Integer call(String v) throws Exception { - return v.length(); - } - }, Encoders.INT()); + Dataset<Integer> mapped = ds.map((MapFunction<String, Integer>) v -> v.length(), Encoders.INT()); Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList()); - Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() { - @Override - public Iterator<String> call(Iterator<String> it) { - List<String> ls = new LinkedList<>(); - while (it.hasNext()) { - ls.add(it.next().toUpperCase(Locale.ENGLISH)); - } - return ls.iterator(); + Dataset<String> parMapped = ds.mapPartitions((MapPartitionsFunction<String, String>) it -> { + List<String> ls = new LinkedList<>(); + while (it.hasNext()) { + ls.add(it.next().toUpperCase(Locale.ENGLISH)); } + return ls.iterator(); }, Encoders.STRING()); Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList()); - Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String s) { - List<String> ls = new LinkedList<>(); - for (char c : s.toCharArray()) { - ls.add(String.valueOf(c)); - } - return ls.iterator(); + Dataset<String> flatMapped = ds.flatMap((FlatMapFunction<String, String>) s -> { + List<String> ls = new LinkedList<>(); + for (char c : s.toCharArray()) { + ls.add(String.valueOf(c)); } + return ls.iterator(); }, Encoders.STRING()); Assert.assertEquals( Arrays.asList("h", "e", "l", "l", "o", "w", "o", "r", "l", "d"), @@ -157,16 +136,11 @@ public class JavaDatasetSuite implements Serializable { @Test public void testForeach() { - final LongAccumulator accum = jsc.sc().longAccumulator(); + LongAccumulator accum = jsc.sc().longAccumulator(); List<String> data = Arrays.asList("a", "b", "c"); Dataset<String> ds = spark.createDataset(data, Encoders.STRING()); - ds.foreach(new ForeachFunction<String>() { - @Override - public void call(String s) throws Exception { - accum.add(1); - } - }); + ds.foreach((ForeachFunction<String>) s -> accum.add(1)); Assert.assertEquals(3, accum.value().intValue()); } @@ -175,12 +149,7 @@ public class JavaDatasetSuite implements Serializable { List<Integer> data = Arrays.asList(1, 2, 3); Dataset<Integer> ds = spark.createDataset(data, Encoders.INT()); - int reduced = ds.reduce(new ReduceFunction<Integer>() { - @Override - public Integer call(Integer v1, Integer v2) throws Exception { - return v1 + v2; - } - }); + int reduced = ds.reduce((ReduceFunction<Integer>) (v1, v2) -> v1 + v2); Assert.assertEquals(6, reduced); } @@ -189,52 +158,38 @@ public class JavaDatasetSuite implements Serializable { List<String> data = Arrays.asList("a", "foo", "bar"); Dataset<String> ds = spark.createDataset(data, Encoders.STRING()); KeyValueGroupedDataset<Integer, String> grouped = ds.groupByKey( - new MapFunction<String, Integer>() { - @Override - public Integer call(String v) throws Exception { - return v.length(); - } - }, + (MapFunction<String, Integer>) v -> v.length(), Encoders.INT()); - Dataset<String> mapped = grouped.mapGroups(new MapGroupsFunction<Integer, String, String>() { - @Override - public String call(Integer key, Iterator<String> values) throws Exception { - StringBuilder sb = new StringBuilder(key.toString()); - while (values.hasNext()) { - sb.append(values.next()); - } - return sb.toString(); + Dataset<String> mapped = grouped.mapGroups((MapGroupsFunction<Integer, String, String>) (key, values) -> { + StringBuilder sb = new StringBuilder(key.toString()); + while (values.hasNext()) { + sb.append(values.next()); } + return sb.toString(); }, Encoders.STRING()); Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList())); Dataset<String> flatMapped = grouped.flatMapGroups( - new FlatMapGroupsFunction<Integer, String, String>() { - @Override - public Iterator<String> call(Integer key, Iterator<String> values) { + (FlatMapGroupsFunction<Integer, String, String>) (key, values) -> { StringBuilder sb = new StringBuilder(key.toString()); while (values.hasNext()) { sb.append(values.next()); } return Collections.singletonList(sb.toString()).iterator(); - } - }, + }, Encoders.STRING()); Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped.collectAsList())); Dataset<String> mapped2 = grouped.mapGroupsWithState( - new MapGroupsWithStateFunction<Integer, String, Long, String>() { - @Override - public String call(Integer key, Iterator<String> values, KeyedState<Long> s) { + (MapGroupsWithStateFunction<Integer, String, Long, String>) (key, values, s) -> { StringBuilder sb = new StringBuilder(key.toString()); while (values.hasNext()) { sb.append(values.next()); } return sb.toString(); - } }, Encoders.LONG(), Encoders.STRING()); @@ -242,27 +197,19 @@ public class JavaDatasetSuite implements Serializable { Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped2.collectAsList())); Dataset<String> flatMapped2 = grouped.flatMapGroupsWithState( - new FlatMapGroupsWithStateFunction<Integer, String, Long, String>() { - @Override - public Iterator<String> call(Integer key, Iterator<String> values, KeyedState<Long> s) { + (FlatMapGroupsWithStateFunction<Integer, String, Long, String>) (key, values, s) -> { StringBuilder sb = new StringBuilder(key.toString()); while (values.hasNext()) { sb.append(values.next()); } return Collections.singletonList(sb.toString()).iterator(); - } - }, + }, Encoders.LONG(), Encoders.STRING()); Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList())); - Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups(new ReduceFunction<String>() { - @Override - public String call(String v1, String v2) throws Exception { - return v1 + v2; - } - }); + Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2); Assert.assertEquals( asSet(tuple2(1, "a"), tuple2(3, "foobar")), @@ -271,29 +218,21 @@ public class JavaDatasetSuite implements Serializable { List<Integer> data2 = Arrays.asList(2, 6, 10); Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT()); KeyValueGroupedDataset<Integer, Integer> grouped2 = ds2.groupByKey( - new MapFunction<Integer, Integer>() { - @Override - public Integer call(Integer v) throws Exception { - return v / 2; - } - }, + (MapFunction<Integer, Integer>) v -> v / 2, Encoders.INT()); Dataset<String> cogrouped = grouped.cogroup( grouped2, - new CoGroupFunction<Integer, String, Integer, String>() { - @Override - public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) { - StringBuilder sb = new StringBuilder(key.toString()); - while (left.hasNext()) { - sb.append(left.next()); - } - sb.append("#"); - while (right.hasNext()) { - sb.append(right.next()); - } - return Collections.singletonList(sb.toString()).iterator(); + (CoGroupFunction<Integer, String, Integer, String>) (key, left, right) -> { + StringBuilder sb = new StringBuilder(key.toString()); + while (left.hasNext()) { + sb.append(left.next()); + } + sb.append("#"); + while (right.hasNext()) { + sb.append(right.next()); } + return Collections.singletonList(sb.toString()).iterator(); }, Encoders.STRING()); @@ -703,11 +642,11 @@ public class JavaDatasetSuite implements Serializable { obj1.setD(new String[]{"hello", null}); obj1.setE(Arrays.asList("a", "b")); obj1.setF(Arrays.asList(100L, null, 200L)); - Map<Integer, String> map1 = new HashMap<Integer, String>(); + Map<Integer, String> map1 = new HashMap<>(); map1.put(1, "a"); map1.put(2, "b"); obj1.setG(map1); - Map<String, String> nestedMap1 = new HashMap<String, String>(); + Map<String, String> nestedMap1 = new HashMap<>(); nestedMap1.put("x", "1"); nestedMap1.put("y", "2"); Map<List<Long>, Map<String, String>> complexMap1 = new HashMap<>(); @@ -721,11 +660,11 @@ public class JavaDatasetSuite implements Serializable { obj2.setD(new String[]{null, "world"}); obj2.setE(Arrays.asList("x", "y")); obj2.setF(Arrays.asList(300L, null, 400L)); - Map<Integer, String> map2 = new HashMap<Integer, String>(); + Map<Integer, String> map2 = new HashMap<>(); map2.put(3, "c"); map2.put(4, "d"); obj2.setG(map2); - Map<String, String> nestedMap2 = new HashMap<String, String>(); + Map<String, String> nestedMap2 = new HashMap<>(); nestedMap2.put("q", "1"); nestedMap2.put("w", "2"); Map<List<Long>, Map<String, String>> complexMap2 = new HashMap<>(); @@ -1328,7 +1267,7 @@ public class JavaDatasetSuite implements Serializable { @Test public void test() { /* SPARK-15285 Large numbers of Nested JavaBeans generates more than 64KB java bytecode */ - List<NestedComplicatedJavaBean> data = new ArrayList<NestedComplicatedJavaBean>(); + List<NestedComplicatedJavaBean> data = new ArrayList<>(); data.add(NestedComplicatedJavaBean.newBuilder().build()); NestedComplicatedJavaBean obj3 = new NestedComplicatedJavaBean(); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java index bbaac5a..250fa67 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java @@ -27,7 +27,6 @@ import org.junit.Test; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; @@ -54,16 +53,7 @@ public class JavaUDFSuite implements Serializable { @SuppressWarnings("unchecked") @Test public void udf1Test() { - // With Java 8 lambdas: - // sqlContext.registerFunction( - // "stringLengthTest", (String str) -> str.length(), DataType.IntegerType); - - spark.udf().register("stringLengthTest", new UDF1<String, Integer>() { - @Override - public Integer call(String str) { - return str.length(); - } - }, DataTypes.IntegerType); + spark.udf().register("stringLengthTest", (String str) -> str.length(), DataTypes.IntegerType); Row result = spark.sql("SELECT stringLengthTest('test')").head(); Assert.assertEquals(4, result.getInt(0)); @@ -72,18 +62,8 @@ public class JavaUDFSuite implements Serializable { @SuppressWarnings("unchecked") @Test public void udf2Test() { - // With Java 8 lambdas: - // sqlContext.registerFunction( - // "stringLengthTest", - // (String str1, String str2) -> str1.length() + str2.length, - // DataType.IntegerType); - - spark.udf().register("stringLengthTest", new UDF2<String, String, Integer>() { - @Override - public Integer call(String str1, String str2) { - return str1.length() + str2.length(); - } - }, DataTypes.IntegerType); + spark.udf().register("stringLengthTest", + (String str1, String str2) -> str1.length() + str2.length(), DataTypes.IntegerType); Row result = spark.sql("SELECT stringLengthTest('test', 'test2')").head(); Assert.assertEquals(9, result.getInt(0)); @@ -91,8 +71,8 @@ public class JavaUDFSuite implements Serializable { public static class StringLengthTest implements UDF2<String, String, Integer> { @Override - public Integer call(String str1, String str2) throws Exception { - return new Integer(str1.length() + str2.length()); + public Integer call(String str1, String str2) { + return str1.length() + str2.length(); } } @@ -113,12 +93,7 @@ public class JavaUDFSuite implements Serializable { @SuppressWarnings("unchecked") @Test public void udf4Test() { - spark.udf().register("inc", new UDF1<Long, Long>() { - @Override - public Long call(Long i) { - return i + 1; - } - }, DataTypes.LongType); + spark.udf().register("inc", (Long i) -> i + 1, DataTypes.LongType); spark.range(10).toDF("x").createOrReplaceTempView("tmp"); // This tests when Java UDFs are required to be the semantically same (See SPARK-9435). http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index 9b77010..cb8ed83 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -27,9 +27,6 @@ import java.util.Set; import scala.Tuple2; import com.google.common.collect.Sets; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.util.ManualClock; import org.junit.Assert; @@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements JavaPairDStream<String, Integer> wordsDstream = null; Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mappingFunc = - new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() { - @Override - public Optional<Double> call( - Time time, String word, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - } + (time, word, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = @@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements stateDstream.stateSnapshots(); Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 = - new Function3<String, Optional<Integer>, State<Boolean>, Double>() { - @Override - public Double call(String key, Optional<Integer> one, State<Boolean> state) { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - } + (key, one, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; }; JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = @@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements ); Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc = - new Function3<String, Optional<Integer>, State<Integer>, Integer>() { - @Override - public Integer call(String key, Optional<Integer> value, State<Integer> state) { - int sum = value.orElse(0) + (state.exists() ? state.get() : 0); - state.update(sum); - return sum; - } + (key, value, state) -> { + int sum = value.orElse(0) + (state.exists() ? state.get() : 0); + state.update(sum); + return sum; }; testOperation( inputData, @@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements int numBatches = expectedOutputs.size(); JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2); JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = - JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, Tuple2<K, Integer>>() { - @Override - public Tuple2<K, Integer> call(K x) { - return new Tuple2<>(x, 1); - } - })).mapWithState(mapWithStateSpec); - - final List<Set<T>> collectedOutputs = + JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec); + + List<Set<T>> collectedOutputs = Collections.synchronizedList(new ArrayList<Set<T>>()); - mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() { - @Override - public void call(JavaRDD<T> rdd) { - collectedOutputs.add(Sets.newHashSet(rdd.collect())); - } - }); - final List<Set<Tuple2<K, S>>> collectedStateSnapshots = + mapWithStateDStream.foreachRDD(rdd -> collectedOutputs.add(Sets.newHashSet(rdd.collect()))); + List<Set<Tuple2<K, S>>> collectedStateSnapshots = Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>()); - mapWithStateDStream.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() { - @Override - public void call(JavaPairRDD<K, S> rdd) { - collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())); - } - }); + mapWithStateDStream.stateSnapshots().foreachRDD(rdd -> + collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()))); BatchCounter batchCounter = new BatchCounter(ssc.ssc()); ssc.start(); ((ManualClock) ssc.ssc().scheduler().clock()) http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index 091ccbf..9156047 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable { TestServer server = new TestServer(0); server.start(); - final AtomicLong dataCounter = new AtomicLong(0); + AtomicLong dataCounter = new AtomicLong(0); try { JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); JavaReceiverInputDStream<String> input = ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); - JavaDStream<String> mapped = input.map(new Function<String, String>() { - @Override - public String call(String v1) { - return v1 + "."; - } - }); - mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() { - @Override - public void call(JavaRDD<String> rdd) { - long count = rdd.count(); - dataCounter.addAndGet(count); - } + JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + "."); + mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> { + long count = rdd.count(); + dataCounter.addAndGet(count); }); ssc.start(); @@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable { @Override public void onStart() { - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index f02fa87..3f4e6dd 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.spark.SparkConf; import org.apache.spark.network.util.JavaUtils; @@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { @Override public Iterator<ByteBuffer> readAll() { - return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() { - @Override - public ByteBuffer apply(Record input) { - return input.buffer; - } - }); + return Iterators.transform(records.iterator(), input -> input.buffer); } @Override @@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { String data1 = "data1"; WriteAheadLogRecordHandle handle = wal.write(JavaUtils.stringToBytes(data1), 1234); Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); - Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1); + Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle))); wal.write(JavaUtils.stringToBytes("data2"), 1235); wal.write(JavaUtils.stringToBytes("data3"), 1236); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 646cb97..9948a40 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -28,7 +28,6 @@ import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.Time; import scala.Tuple2; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -101,7 +100,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ while (in.hasNext()) { out = out + in.next().toUpperCase(); } - return Lists.newArrayList(out).iterator(); + return Arrays.asList(out).iterator(); }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -240,7 +239,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>(); for (List<Tuple2<String, Tuple2<String, String>>> res : result) { unorderedResult.add(Sets.newHashSet(res)); } @@ -315,7 +314,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles JavaDStream<Long> transformed1 = ssc.transform( @@ -325,7 +324,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ }); List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + Arrays.asList(stream1, stream2, pairStream1.toJavaDStream()); JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { @@ -358,7 +357,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> flatMapped = stream.flatMap( - s -> Lists.newArrayList(s.split("(?!^)")).iterator()); + s -> Arrays.asList(s.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -401,7 +400,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); + List<Tuple2<Integer, String>> out = new ArrayList<>(); for (String letter : s.split("(?!^)")) { out.add(new Tuple2<>(s.length(), letter)); } @@ -420,7 +419,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { - expected.forEach(list -> Collections.sort(list)); + expected.forEach(Collections::sort); List<List<T>> sortedActual = new ArrayList<>(); actual.forEach(list -> { List<T> sortedList = new ArrayList<>(list); @@ -491,7 +490,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap()); + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -543,7 +542,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); + JavaDStream<Integer> reversed = pairStream.map(Tuple2::_2); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -629,7 +628,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i, + JavaPairDStream<String, Integer> combined = pairStream.combineByKey(i -> i, (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index 8d24104..b966cbd 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -33,7 +33,6 @@ import org.apache.spark.streaming.Time; import scala.Tuple2; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; @@ -123,12 +122,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(9,4)); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { - @Override - public Integer call(String s) { - return s.length(); - } - }); + JavaDStream<Integer> letterCount = stream.map(String::length); JavaTestUtils.attachTestOutputStream(letterCount); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -194,12 +188,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("yankees")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { - @Override - public Boolean call(String s) { - return s.contains("a"); - } - }); + JavaDStream<String> filtered = stream.filter(s -> s.contains("a")); JavaTestUtils.attachTestOutputStream(filtered); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -276,17 +265,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("YANKEESRED SOX")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions( - new FlatMapFunction<Iterator<String>, String>() { - @Override - public Iterator<String> call(Iterator<String> in) { - StringBuilder out = new StringBuilder(); - while (in.hasNext()) { - out.append(in.next().toUpperCase(Locale.ENGLISH)); - } - return Arrays.asList(out.toString()).iterator(); - } - }); + JavaDStream<String> mapped = stream.mapPartitions(in -> { + StringBuilder out = new StringBuilder(); + while (in.hasNext()) { + out.append(in.next().toUpperCase(Locale.ENGLISH)); + } + return Arrays.asList(out.toString()).iterator(); + }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -416,18 +401,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(9,10,11)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = stream.transform( - new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) { - return in.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) { - return i + 2; - } - }); - } - }); + JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2)); JavaTestUtils.attachTestOutputStream(transformed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -448,71 +422,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - stream.transform( - new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) { - return null; - } - } - ); + stream.transform(in -> null); - stream.transform( - new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) { - return null; - } - } - ); + stream.transform((in, time) -> null); - stream.transformToPair( - new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) { - return null; - } - } - ); + stream.transformToPair(in -> null); - stream.transformToPair( - new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { - @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) { - return null; - } - } - ); + stream.transformToPair((in, time) -> null); - pairStream.transform( - new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) { - return null; - } - } - ); + pairStream.transform(in -> null); - pairStream.transform( - new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { - @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) { - return null; - } - } - ); + pairStream.transform((in, time) -> null); - pairStream.transformToPair( - new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) { - return null; - } - } - ); + pairStream.transformToPair(in -> null); - pairStream.transformToPair( - new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { - @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, - Time time) { - return null; - } - } - ); + pairStream.transformToPair((in, time) -> null); } @@ -558,19 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair( pairStream2, - new Function3< - JavaPairRDD<String, String>, - JavaPairRDD<String, String>, - Time, - JavaPairRDD<String, Tuple2<String, String>>>() { - @Override - public JavaPairRDD<String, Tuple2<String, String>> call( - JavaPairRDD<String, String> rdd1, - JavaPairRDD<String, String> rdd2, - Time time) { - return rdd1.join(rdd2); - } - } + (rdd1, rdd2, time) -> rdd1.join(rdd2) ); JavaTestUtils.attachTestOutputStream(joined); @@ -603,100 +515,21 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - stream1.transformWith( - stream2, - new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) { - return null; - } - } - ); + stream1.transformWith(stream2, (rdd1, rdd2, time) -> null); - stream1.transformWith( - pairStream1, - new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null); - stream1.transformWithToPair( - stream2, - new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null); - stream1.transformWithToPair( - pairStream1, - new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, - JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, - JavaPairRDD<String, Integer> rdd2, - Time time) { - return null; - } - } - ); + stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null); - pairStream1.transformWith( - stream2, - new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null); - pairStream1.transformWith( - pairStream1, - new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, - JavaRDD<Double>>() { - @Override - public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, - JavaPairRDD<String, Integer> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null); - pairStream1.transformWithToPair( - stream2, - new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, - JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, - JavaRDD<String> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null); - pairStream1.transformWithToPair( - pairStream2, - new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, - JavaPairRDD<Double, Double>>() { - @Override - public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, - JavaPairRDD<Double, Character> rdd2, - Time time) { - return null; - } - } - ); + pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null); } @SuppressWarnings("unchecked") @@ -727,44 +560,32 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2); // This is just to test whether this transform to JavaStream compiles ssc.transform( listOfDStreams1, - new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() { - @Override - public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - } + (listOfRDDs, time) -> { + Assert.assertEquals(2, listOfRDDs.size()); + return null; } ); List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + Arrays.asList(stream1, stream2, pairStream1.toJavaDStream()); JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, - new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { - @Override - public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, - Time time) { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); - JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); - JavaRDD<Tuple2<Integer, String>> rdd3 = - (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); - JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction<Integer, Integer, Integer> mapToTuple = - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i, i); - } - }; - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - } + (listOfRDDs, time) -> { + Assert.assertEquals(3, listOfRDDs.size()); + JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0); + JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1); + JavaRDD<Tuple2<Integer, String>> rdd3 = + (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2); + JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction<Integer, Integer, Integer> mapToTuple = + (PairFunction<Integer, Integer, Integer>) i -> new Tuple2<>(i, i); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); } ); JavaTestUtils.attachTestOutputStream(transformed2); @@ -787,12 +608,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("a","t","h","l","e","t","i","c","s")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split("(?!^)")).iterator(); - } - }); + JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -811,25 +627,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() { - @Override - public void call(JavaRDD<Integer> rdd) { - accumRdd.add(1); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer i) { - accumEle.add(1); - } - }); - } + stream.foreachRDD(rdd -> { + accumRdd.add(1); + rdd.foreach(i -> accumEle.add(1)); }); // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() { - @Override - public void call(JavaRDD<Integer> rdd, Time time) { - } - }); + stream.foreachRDD((rdd, time) -> {}); JavaTestUtils.runStreams(ssc, 2, 2); @@ -873,16 +677,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( - new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterator<Tuple2<Integer, String>> call(String in) { - List<Tuple2<Integer, String>> out = new ArrayList<>(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<>(in.length(), letter)); - } - return out.iterator(); + JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(in -> { + List<Tuple2<Integer, String>> out = new ArrayList<>(); + for (String letter : in.split("(?!^)")) { + out.add(new Tuple2<>(in.length(), letter)); } + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -949,21 +749,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(new Tuple2<>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = stream.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String in) { - return new Tuple2<>(in, in.length()); - } - }); + JavaPairDStream<String, Integer> pairStream = + stream.mapToPair(in -> new Tuple2<>(in, in.length())); - JavaPairDStream<String, Integer> filtered = pairStream.filter( - new Function<Tuple2<String, Integer>, Boolean>() { - @Override - public Boolean call(Tuple2<String, Integer> in) { - return in._1().contains("a"); - } - }); + JavaPairDStream<String, Integer> filtered = pairStream.filter(in -> in._1().contains("a")); JavaTestUtils.attachTestOutputStream(filtered); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1014,13 +803,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( - new PairFunction<Tuple2<String, Integer>, Integer, String>() { - @Override - public Tuple2<Integer, String> call(Tuple2<String, Integer> in) { - return in.swap(); - } - }); + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(Tuple2::swap); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1048,18 +831,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( - new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { - @Override - public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2<String, Integer> next = in.next(); - out.add(next.swap()); - } - return out.iterator(); - } - }); + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { + List<Tuple2<Integer, String>> out = new LinkedList<>(); + while (in.hasNext()) { + Tuple2<String, Integer> next = in.next(); + out.add(next.swap()); + } + return out.iterator(); + }); JavaTestUtils.attachTestOutputStream(reversed); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1079,13 +858,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map( - new Function<Tuple2<String, Integer>, Integer>() { - @Override - public Integer call(Tuple2<String, Integer> in) { - return in._2(); - } - }); + JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); JavaTestUtils.attachTestOutputStream(reversed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1119,17 +892,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( - new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { - @Override - public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out.iterator(); - } - }); + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { + List<Tuple2<Integer, String>> out = new LinkedList<>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2<>(in._2(), s.toString())); + } + return out.iterator(); + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1216,12 +985,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> combined = pairStream.combineByKey( - new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) { - return i; - } - }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); JavaTestUtils.attachTestOutputStream(combined); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1345,20 +1109,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( - new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }); + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out += state.get(); + } + for (Integer v : values) { + out += v; + } + return Optional.of(out); + }); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1389,20 +1149,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( - new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - int out = 0; - if (state.isPresent()) { - out += state.get(); - } - for (Integer v : values) { - out += v; - } - return Optional.of(out); - } - }, new HashPartitioner(1), initialRDD); + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out += state.get(); + } + for (Integer v : values) { + out += v; + } + return Optional.of(out); + }, new HashPartitioner(1), initialRDD); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1500,13 +1256,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( - new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { - @Override - public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) { - return in.sortByKey(); - } - }); + JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey()); JavaTestUtils.attachTestOutputStream(sorted); List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1537,18 +1287,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> firstParts = pairStream.transform( - new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) { - return in.map(new Function<Tuple2<Integer, Integer>, Integer>() { - @Override - public Integer call(Tuple2<Integer, Integer> in2) { - return in2._1(); - } - }); - } - }); + JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(in2 -> in2._1())); JavaTestUtils.attachTestOutputStream(firstParts); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1575,12 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { - @Override - public String call(String s) { - return s.toUpperCase(Locale.ENGLISH); - } - }); + JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1616,16 +1350,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues( - new Function<String, Iterable<String>>() { - @Override - public Iterable<String> call(String in) { - List<String> out = new ArrayList<>(); - out.add(in + "1"); - out.add(in + "2"); - return out; - } - }); + JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> { + List<String> out = new ArrayList<>(); + out.add(in + "1"); + out.add(in + "2"); + return out; + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -1795,12 +1525,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { - @Override - public Integer call(String s) { - return s.length(); - } - }); + JavaDStream<Integer> letterCount = stream.map(String::length); JavaCheckpointTestUtils.attachTestOutputStream(letterCount); List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1); @@ -1822,7 +1547,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testContextGetOrCreate() throws InterruptedException { ssc.stop(); - final SparkConf conf = new SparkConf() + SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("test") .set("newContext", "true"); @@ -1835,13 +1560,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // Function to create JavaStreamingContext without any output operations // (used to detect the new context) - final AtomicBoolean newContextCreated = new AtomicBoolean(false); - Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() { - @Override - public JavaStreamingContext call() { - newContextCreated.set(true); - return new JavaStreamingContext(conf, Seconds.apply(1)); - } + AtomicBoolean newContextCreated = new AtomicBoolean(false); + Function0<JavaStreamingContext> creatingFunc = () -> { + newContextCreated.set(true); + return new JavaStreamingContext(conf, Seconds.apply(1)); }; newContextCreated.set(false); @@ -1912,18 +1634,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc.socketStream( "localhost", 12345, - new Function<InputStream, Iterable<String>>() { - @Override - public Iterable<String> call(InputStream in) throws IOException { - List<String> out = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(in, StandardCharsets.UTF_8))) { - for (String line; (line = reader.readLine()) != null;) { - out.add(line); - } + in -> { + List<String> out = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8))) { + for (String line; (line = reader.readLine()) != null;) { + out.add(line); } - return out; } + return out; }, StorageLevel.MEMORY_ONLY()); } @@ -1952,21 +1671,10 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa LongWritable.class, Text.class, TextInputFormat.class, - new Function<Path, Boolean>() { - @Override - public Boolean call(Path v1) { - return Boolean.TRUE; - } - }, + v1 -> Boolean.TRUE, true); - JavaDStream<String> test = inputStream.map( - new Function<Tuple2<LongWritable, Text>, String>() { - @Override - public String call(Tuple2<LongWritable, Text> v1) { - return v1._2().toString(); - } - }); + JavaDStream<String> test = inputStream.map(v1 -> v1._2().toString()); JavaTestUtils.attachTestOutputStream(test); List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
