Repository: spark
Updated Branches:
  refs/heads/master de14d35f7 -> 1487c9af2


http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index a94a37c..577672c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -96,12 +96,7 @@ public class JavaDatasetSuite implements Serializable {
   @Test
   public void testTypedFilterPreservingSchema() {
     Dataset<Long> ds = spark.range(10);
-    Dataset<Long> ds2 = ds.filter(new FilterFunction<Long>() {
-      @Override
-      public boolean call(Long value) throws Exception {
-        return value > 3;
-      }
-    });
+    Dataset<Long> ds2 = ds.filter((FilterFunction<Long>) value -> value > 3);
     Assert.assertEquals(ds.schema(), ds2.schema());
   }
 
@@ -111,44 +106,28 @@ public class JavaDatasetSuite implements Serializable {
     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
     Assert.assertEquals("hello", ds.first());
 
-    Dataset<String> filtered = ds.filter(new FilterFunction<String>() {
-      @Override
-      public boolean call(String v) throws Exception {
-        return v.startsWith("h");
-      }
-    });
+    Dataset<String> filtered = ds.filter((FilterFunction<String>) v -> 
v.startsWith("h"));
     Assert.assertEquals(Arrays.asList("hello"), filtered.collectAsList());
 
 
-    Dataset<Integer> mapped = ds.map(new MapFunction<String, Integer>() {
-      @Override
-      public Integer call(String v) throws Exception {
-        return v.length();
-      }
-    }, Encoders.INT());
+    Dataset<Integer> mapped = ds.map((MapFunction<String, Integer>) v -> 
v.length(), Encoders.INT());
     Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList());
 
-    Dataset<String> parMapped = ds.mapPartitions(new 
MapPartitionsFunction<String, String>() {
-      @Override
-      public Iterator<String> call(Iterator<String> it) {
-        List<String> ls = new LinkedList<>();
-        while (it.hasNext()) {
-          ls.add(it.next().toUpperCase(Locale.ENGLISH));
-        }
-        return ls.iterator();
+    Dataset<String> parMapped = 
ds.mapPartitions((MapPartitionsFunction<String, String>) it -> {
+      List<String> ls = new LinkedList<>();
+      while (it.hasNext()) {
+        ls.add(it.next().toUpperCase(Locale.ENGLISH));
       }
+      return ls.iterator();
     }, Encoders.STRING());
     Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), 
parMapped.collectAsList());
 
-    Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String s) {
-        List<String> ls = new LinkedList<>();
-        for (char c : s.toCharArray()) {
-          ls.add(String.valueOf(c));
-        }
-        return ls.iterator();
+    Dataset<String> flatMapped = ds.flatMap((FlatMapFunction<String, String>) 
s -> {
+      List<String> ls = new LinkedList<>();
+      for (char c : s.toCharArray()) {
+        ls.add(String.valueOf(c));
       }
+      return ls.iterator();
     }, Encoders.STRING());
     Assert.assertEquals(
       Arrays.asList("h", "e", "l", "l", "o", "w", "o", "r", "l", "d"),
@@ -157,16 +136,11 @@ public class JavaDatasetSuite implements Serializable {
 
   @Test
   public void testForeach() {
-    final LongAccumulator accum = jsc.sc().longAccumulator();
+    LongAccumulator accum = jsc.sc().longAccumulator();
     List<String> data = Arrays.asList("a", "b", "c");
     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
 
-    ds.foreach(new ForeachFunction<String>() {
-      @Override
-      public void call(String s) throws Exception {
-        accum.add(1);
-      }
-    });
+    ds.foreach((ForeachFunction<String>) s -> accum.add(1));
     Assert.assertEquals(3, accum.value().intValue());
   }
 
@@ -175,12 +149,7 @@ public class JavaDatasetSuite implements Serializable {
     List<Integer> data = Arrays.asList(1, 2, 3);
     Dataset<Integer> ds = spark.createDataset(data, Encoders.INT());
 
-    int reduced = ds.reduce(new ReduceFunction<Integer>() {
-      @Override
-      public Integer call(Integer v1, Integer v2) throws Exception {
-        return v1 + v2;
-      }
-    });
+    int reduced = ds.reduce((ReduceFunction<Integer>) (v1, v2) -> v1 + v2);
     Assert.assertEquals(6, reduced);
   }
 
@@ -189,52 +158,38 @@ public class JavaDatasetSuite implements Serializable {
     List<String> data = Arrays.asList("a", "foo", "bar");
     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
     KeyValueGroupedDataset<Integer, String> grouped = ds.groupByKey(
-      new MapFunction<String, Integer>() {
-        @Override
-        public Integer call(String v) throws Exception {
-          return v.length();
-        }
-      },
+        (MapFunction<String, Integer>) v -> v.length(),
       Encoders.INT());
 
-    Dataset<String> mapped = grouped.mapGroups(new MapGroupsFunction<Integer, 
String, String>() {
-      @Override
-      public String call(Integer key, Iterator<String> values) throws 
Exception {
-        StringBuilder sb = new StringBuilder(key.toString());
-        while (values.hasNext()) {
-          sb.append(values.next());
-        }
-        return sb.toString();
+    Dataset<String> mapped = grouped.mapGroups((MapGroupsFunction<Integer, 
String, String>) (key, values) -> {
+      StringBuilder sb = new StringBuilder(key.toString());
+      while (values.hasNext()) {
+        sb.append(values.next());
       }
+      return sb.toString();
     }, Encoders.STRING());
 
     Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList()));
 
     Dataset<String> flatMapped = grouped.flatMapGroups(
-      new FlatMapGroupsFunction<Integer, String, String>() {
-        @Override
-        public Iterator<String> call(Integer key, Iterator<String> values) {
+        (FlatMapGroupsFunction<Integer, String, String>) (key, values) -> {
           StringBuilder sb = new StringBuilder(key.toString());
           while (values.hasNext()) {
             sb.append(values.next());
           }
           return Collections.singletonList(sb.toString()).iterator();
-        }
-      },
+        },
       Encoders.STRING());
 
     Assert.assertEquals(asSet("1a", "3foobar"), 
toSet(flatMapped.collectAsList()));
 
     Dataset<String> mapped2 = grouped.mapGroupsWithState(
-      new MapGroupsWithStateFunction<Integer, String, Long, String>() {
-        @Override
-        public String call(Integer key, Iterator<String> values, 
KeyedState<Long> s) {
+        (MapGroupsWithStateFunction<Integer, String, Long, String>) (key, 
values, s) -> {
           StringBuilder sb = new StringBuilder(key.toString());
           while (values.hasNext()) {
             sb.append(values.next());
           }
           return sb.toString();
-        }
         },
         Encoders.LONG(),
         Encoders.STRING());
@@ -242,27 +197,19 @@ public class JavaDatasetSuite implements Serializable {
     Assert.assertEquals(asSet("1a", "3foobar"), 
toSet(mapped2.collectAsList()));
 
     Dataset<String> flatMapped2 = grouped.flatMapGroupsWithState(
-      new FlatMapGroupsWithStateFunction<Integer, String, Long, String>() {
-        @Override
-        public Iterator<String> call(Integer key, Iterator<String> values, 
KeyedState<Long> s) {
+        (FlatMapGroupsWithStateFunction<Integer, String, Long, String>) (key, 
values, s) -> {
           StringBuilder sb = new StringBuilder(key.toString());
           while (values.hasNext()) {
             sb.append(values.next());
           }
           return Collections.singletonList(sb.toString()).iterator();
-        }
-      },
+        },
       Encoders.LONG(),
       Encoders.STRING());
 
     Assert.assertEquals(asSet("1a", "3foobar"), 
toSet(flatMapped2.collectAsList()));
 
-    Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups(new 
ReduceFunction<String>() {
-      @Override
-      public String call(String v1, String v2) throws Exception {
-        return v1 + v2;
-      }
-    });
+    Dataset<Tuple2<Integer, String>> reduced = 
grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2);
 
     Assert.assertEquals(
       asSet(tuple2(1, "a"), tuple2(3, "foobar")),
@@ -271,29 +218,21 @@ public class JavaDatasetSuite implements Serializable {
     List<Integer> data2 = Arrays.asList(2, 6, 10);
     Dataset<Integer> ds2 = spark.createDataset(data2, Encoders.INT());
     KeyValueGroupedDataset<Integer, Integer> grouped2 = ds2.groupByKey(
-      new MapFunction<Integer, Integer>() {
-        @Override
-        public Integer call(Integer v) throws Exception {
-          return v / 2;
-        }
-      },
+        (MapFunction<Integer, Integer>) v -> v / 2,
       Encoders.INT());
 
     Dataset<String> cogrouped = grouped.cogroup(
       grouped2,
-      new CoGroupFunction<Integer, String, Integer, String>() {
-        @Override
-        public Iterator<String> call(Integer key, Iterator<String> left, 
Iterator<Integer> right) {
-          StringBuilder sb = new StringBuilder(key.toString());
-          while (left.hasNext()) {
-            sb.append(left.next());
-          }
-          sb.append("#");
-          while (right.hasNext()) {
-            sb.append(right.next());
-          }
-          return Collections.singletonList(sb.toString()).iterator();
+      (CoGroupFunction<Integer, String, Integer, String>) (key, left, right) 
-> {
+        StringBuilder sb = new StringBuilder(key.toString());
+        while (left.hasNext()) {
+          sb.append(left.next());
+        }
+        sb.append("#");
+        while (right.hasNext()) {
+          sb.append(right.next());
         }
+        return Collections.singletonList(sb.toString()).iterator();
       },
       Encoders.STRING());
 
@@ -703,11 +642,11 @@ public class JavaDatasetSuite implements Serializable {
     obj1.setD(new String[]{"hello", null});
     obj1.setE(Arrays.asList("a", "b"));
     obj1.setF(Arrays.asList(100L, null, 200L));
-    Map<Integer, String> map1 = new HashMap<Integer, String>();
+    Map<Integer, String> map1 = new HashMap<>();
     map1.put(1, "a");
     map1.put(2, "b");
     obj1.setG(map1);
-    Map<String, String> nestedMap1 = new HashMap<String, String>();
+    Map<String, String> nestedMap1 = new HashMap<>();
     nestedMap1.put("x", "1");
     nestedMap1.put("y", "2");
     Map<List<Long>, Map<String, String>> complexMap1 = new HashMap<>();
@@ -721,11 +660,11 @@ public class JavaDatasetSuite implements Serializable {
     obj2.setD(new String[]{null, "world"});
     obj2.setE(Arrays.asList("x", "y"));
     obj2.setF(Arrays.asList(300L, null, 400L));
-    Map<Integer, String> map2 = new HashMap<Integer, String>();
+    Map<Integer, String> map2 = new HashMap<>();
     map2.put(3, "c");
     map2.put(4, "d");
     obj2.setG(map2);
-    Map<String, String> nestedMap2 = new HashMap<String, String>();
+    Map<String, String> nestedMap2 = new HashMap<>();
     nestedMap2.put("q", "1");
     nestedMap2.put("w", "2");
     Map<List<Long>, Map<String, String>> complexMap2 = new HashMap<>();
@@ -1328,7 +1267,7 @@ public class JavaDatasetSuite implements Serializable {
   @Test
   public void test() {
     /* SPARK-15285 Large numbers of Nested JavaBeans generates more than 64KB 
java bytecode */
-    List<NestedComplicatedJavaBean> data = new 
ArrayList<NestedComplicatedJavaBean>();
+    List<NestedComplicatedJavaBean> data = new ArrayList<>();
     data.add(NestedComplicatedJavaBean.newBuilder().build());
 
     NestedComplicatedJavaBean obj3 = new NestedComplicatedJavaBean();

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
index bbaac5a..250fa67 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java
@@ -27,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.api.java.UDF1;
 import org.apache.spark.sql.api.java.UDF2;
 import org.apache.spark.sql.types.DataTypes;
 
@@ -54,16 +53,7 @@ public class JavaUDFSuite implements Serializable {
   @SuppressWarnings("unchecked")
   @Test
   public void udf1Test() {
-    // With Java 8 lambdas:
-    // sqlContext.registerFunction(
-    //   "stringLengthTest", (String str) -> str.length(), 
DataType.IntegerType);
-
-    spark.udf().register("stringLengthTest", new UDF1<String, Integer>() {
-      @Override
-      public Integer call(String str) {
-        return str.length();
-      }
-    }, DataTypes.IntegerType);
+    spark.udf().register("stringLengthTest", (String str) -> str.length(), 
DataTypes.IntegerType);
 
     Row result = spark.sql("SELECT stringLengthTest('test')").head();
     Assert.assertEquals(4, result.getInt(0));
@@ -72,18 +62,8 @@ public class JavaUDFSuite implements Serializable {
   @SuppressWarnings("unchecked")
   @Test
   public void udf2Test() {
-    // With Java 8 lambdas:
-    // sqlContext.registerFunction(
-    //   "stringLengthTest",
-    //   (String str1, String str2) -> str1.length() + str2.length,
-    //   DataType.IntegerType);
-
-    spark.udf().register("stringLengthTest", new UDF2<String, String, 
Integer>() {
-      @Override
-      public Integer call(String str1, String str2) {
-        return str1.length() + str2.length();
-      }
-    }, DataTypes.IntegerType);
+    spark.udf().register("stringLengthTest",
+        (String str1, String str2) -> str1.length() + str2.length(), 
DataTypes.IntegerType);
 
     Row result = spark.sql("SELECT stringLengthTest('test', 'test2')").head();
     Assert.assertEquals(9, result.getInt(0));
@@ -91,8 +71,8 @@ public class JavaUDFSuite implements Serializable {
 
   public static class StringLengthTest implements UDF2<String, String, 
Integer> {
     @Override
-    public Integer call(String str1, String str2) throws Exception {
-      return new Integer(str1.length() + str2.length());
+    public Integer call(String str1, String str2) {
+      return str1.length() + str2.length();
     }
   }
 
@@ -113,12 +93,7 @@ public class JavaUDFSuite implements Serializable {
   @SuppressWarnings("unchecked")
   @Test
   public void udf4Test() {
-    spark.udf().register("inc", new UDF1<Long, Long>() {
-      @Override
-      public Long call(Long i) {
-        return i + 1;
-      }
-    }, DataTypes.LongType);
+    spark.udf().register("inc", (Long i) -> i + 1, DataTypes.LongType);
 
     spark.range(10).toDF("x").createOrReplaceTempView("tmp");
     // This tests when Java UDFs are required to be the semantically same (See 
SPARK-9435).

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 9b77010..cb8ed83 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -27,9 +27,6 @@ import java.util.Set;
 import scala.Tuple2;
 
 import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.util.ManualClock;
 import org.junit.Assert;
@@ -53,18 +50,14 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
     JavaPairDStream<String, Integer> wordsDstream = null;
 
     Function4<Time, String, Optional<Integer>, State<Boolean>, 
Optional<Double>> mappingFunc =
-        new Function4<Time, String, Optional<Integer>, State<Boolean>, 
Optional<Double>>() {
-          @Override
-          public Optional<Double> call(
-              Time time, String word, Optional<Integer> one, State<Boolean> 
state) {
-            // Use all State's methods here
-            state.exists();
-            state.get();
-            state.isTimingOut();
-            state.remove();
-            state.update(true);
-            return Optional.of(2.0);
-          }
+        (time, word, one, state) -> {
+          // Use all State's methods here
+          state.exists();
+          state.get();
+          state.isTimingOut();
+          state.remove();
+          state.update(true);
+          return Optional.of(2.0);
         };
 
     JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
@@ -78,17 +71,14 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
     stateDstream.stateSnapshots();
 
     Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
-        new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
-          @Override
-          public Double call(String key, Optional<Integer> one, State<Boolean> 
state) {
-            // Use all State's methods here
-            state.exists();
-            state.get();
-            state.isTimingOut();
-            state.remove();
-            state.update(true);
-            return 2.0;
-          }
+        (key, one, state) -> {
+          // Use all State's methods here
+          state.exists();
+          state.get();
+          state.isTimingOut();
+          state.remove();
+          state.update(true);
+          return 2.0;
         };
 
     JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
@@ -136,13 +126,10 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
     );
 
     Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
-        new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
-          @Override
-          public Integer call(String key, Optional<Integer> value, 
State<Integer> state) {
-            int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
-            state.update(sum);
-            return sum;
-          }
+        (key, value, state) -> {
+          int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
+          state.update(sum);
+          return sum;
         };
     testOperation(
         inputData,
@@ -159,29 +146,15 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
     int numBatches = expectedOutputs.size();
     JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, 
input, 2);
     JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
-        JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, 
Tuple2<K, Integer>>() {
-          @Override
-          public Tuple2<K, Integer> call(K x) {
-            return new Tuple2<>(x, 1);
-          }
-        })).mapWithState(mapWithStateSpec);
-
-    final List<Set<T>> collectedOutputs =
+        JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 
1))).mapWithState(mapWithStateSpec);
+
+    List<Set<T>> collectedOutputs =
         Collections.synchronizedList(new ArrayList<Set<T>>());
-    mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
-      @Override
-      public void call(JavaRDD<T> rdd) {
-        collectedOutputs.add(Sets.newHashSet(rdd.collect()));
-      }
-    });
-    final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
+    mapWithStateDStream.foreachRDD(rdd -> 
collectedOutputs.add(Sets.newHashSet(rdd.collect())));
+    List<Set<Tuple2<K, S>>> collectedStateSnapshots =
         Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
-    mapWithStateDStream.stateSnapshots().foreachRDD(new 
VoidFunction<JavaPairRDD<K, S>>() {
-      @Override
-      public void call(JavaPairRDD<K, S> rdd) {
-        collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
-      }
-    });
+    mapWithStateDStream.stateSnapshots().foreachRDD(rdd ->
+        collectedStateSnapshots.add(Sets.newHashSet(rdd.collect())));
     BatchCounter batchCounter = new BatchCounter(ssc.ssc());
     ssc.start();
     ((ManualClock) ssc.ssc().scheduler().clock())

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 091ccbf..9156047 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -58,24 +58,16 @@ public class JavaReceiverAPISuite implements Serializable {
     TestServer server = new TestServer(0);
     server.start();
 
-    final AtomicLong dataCounter = new AtomicLong(0);
+    AtomicLong dataCounter = new AtomicLong(0);
 
     try {
       JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", 
new Duration(200));
       JavaReceiverInputDStream<String> input =
         ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
-      JavaDStream<String> mapped = input.map(new Function<String, String>() {
-        @Override
-        public String call(String v1) {
-          return v1 + ".";
-        }
-      });
-      mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-        @Override
-        public void call(JavaRDD<String> rdd) {
-          long count = rdd.count();
-          dataCounter.addAndGet(count);
-        }
+      JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> 
v1 + ".");
+      mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
+        long count = rdd.count();
+        dataCounter.addAndGet(count);
       });
 
       ssc.start();
@@ -110,11 +102,7 @@ public class JavaReceiverAPISuite implements Serializable {
 
     @Override
     public void onStart() {
-      new Thread()  {
-        @Override public void run() {
-          receive();
-        }
-      }.start();
+      new Thread(this::receive).start();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index f02fa87..3f4e6dd 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import org.apache.spark.SparkConf;
 import org.apache.spark.network.util.JavaUtils;
@@ -81,12 +80,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
 
   @Override
   public Iterator<ByteBuffer> readAll() {
-    return Iterators.transform(records.iterator(), new 
Function<Record,ByteBuffer>() {
-      @Override
-      public ByteBuffer apply(Record input) {
-        return input.buffer;
-      }
-    });
+    return Iterators.transform(records.iterator(), input -> input.buffer);
   }
 
   @Override
@@ -114,7 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
     String data1 = "data1";
     WriteAheadLogRecordHandle handle = 
wal.write(JavaUtils.stringToBytes(data1), 1234);
     Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
-    Assert.assertEquals(JavaUtils.bytesToString(wal.read(handle)), data1);
+    Assert.assertEquals(data1, JavaUtils.bytesToString(wal.read(handle)));
 
     wal.write(JavaUtils.stringToBytes("data2"), 1235);
     wal.write(JavaUtils.stringToBytes("data3"), 1236);

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 646cb97..9948a40 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -28,7 +28,6 @@ import org.apache.spark.streaming.StateSpec;
 import org.apache.spark.streaming.Time;
 import scala.Tuple2;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
@@ -101,7 +100,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
       while (in.hasNext()) {
         out = out + in.next().toUpperCase();
       }
-      return Lists.newArrayList(out).iterator();
+      return Arrays.asList(out).iterator();
     });
     JavaTestUtils.attachTestOutputStream(mapped);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -240,7 +239,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
 
     JavaTestUtils.attachTestOutputStream(joined);
     List<List<Tuple2<String, Tuple2<String, String>>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
-    List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = 
Lists.newArrayList();
+    List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new 
ArrayList<>();
     for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
       unorderedResult.add(Sets.newHashSet(res));
     }
@@ -315,7 +314,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
     JavaPairDStream<Integer, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(
       JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
 
-    List<JavaDStream<?>> listOfDStreams1 = 
Arrays.<JavaDStream<?>>asList(stream1, stream2);
+    List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
 
     // This is just to test whether this transform to JavaStream compiles
     JavaDStream<Long> transformed1 = ssc.transform(
@@ -325,7 +324,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
     });
 
     List<JavaDStream<?>> listOfDStreams2 =
-      Arrays.<JavaDStream<?>>asList(stream1, stream2, 
pairStream1.toJavaDStream());
+      Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
 
     JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = 
ssc.transformToPair(
       listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
@@ -358,7 +357,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
     JavaDStream<String> flatMapped = stream.flatMap(
-        s -> Lists.newArrayList(s.split("(?!^)")).iterator());
+        s -> Arrays.asList(s.split("(?!^)")).iterator());
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -401,7 +400,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
-      List<Tuple2<Integer, String>> out = Lists.newArrayList();
+      List<Tuple2<Integer, String>> out = new ArrayList<>();
       for (String letter : s.split("(?!^)")) {
         out.add(new Tuple2<>(s.length(), letter));
       }
@@ -420,7 +419,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
    */
   public static <T extends Comparable<T>> void assertOrderInvariantEquals(
     List<List<T>> expected, List<List<T>> actual) {
-    expected.forEach(list -> Collections.sort(list));
+    expected.forEach(Collections::sort);
     List<List<T>> sortedActual = new ArrayList<>();
     actual.forEach(list -> {
         List<T> sortedList = new ArrayList<>(list);
@@ -491,7 +490,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> 
x.swap());
+    JavaPairDStream<Integer, String> reversed = 
pairStream.mapToPair(Tuple2::swap);
     JavaTestUtils.attachTestOutputStream(reversed);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
 
@@ -543,7 +542,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+    JavaDStream<Integer> reversed = pairStream.map(Tuple2::_2);
     JavaTestUtils.attachTestOutputStream(reversed);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
 
@@ -629,7 +628,7 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
       ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, Integer> combined = 
pairStream.<Integer>combineByKey(i -> i,
+    JavaPairDStream<String, Integer> combined = pairStream.combineByKey(i -> i,
       (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
 
     JavaTestUtils.attachTestOutputStream(combined);

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index 8d24104..b966cbd 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,6 @@ import org.apache.spark.streaming.Time;
 import scala.Tuple2;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -123,12 +122,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(9,4));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaDStream<Integer> letterCount = stream.map(new Function<String, 
Integer>() {
-        @Override
-        public Integer call(String s) {
-          return s.length();
-        }
-    });
+    JavaDStream<Integer> letterCount = stream.map(String::length);
     JavaTestUtils.attachTestOutputStream(letterCount);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -194,12 +188,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList("yankees"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaDStream<String> filtered = stream.filter(new Function<String, 
Boolean>() {
-      @Override
-      public Boolean call(String s) {
-        return s.contains("a");
-      }
-    });
+    JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
     JavaTestUtils.attachTestOutputStream(filtered);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -276,17 +265,13 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList("YANKEESRED SOX"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaDStream<String> mapped = stream.mapPartitions(
-        new FlatMapFunction<Iterator<String>, String>() {
-          @Override
-          public Iterator<String> call(Iterator<String> in) {
-            StringBuilder out = new StringBuilder();
-            while (in.hasNext()) {
-              out.append(in.next().toUpperCase(Locale.ENGLISH));
-            }
-            return Arrays.asList(out.toString()).iterator();
-          }
-        });
+    JavaDStream<String> mapped = stream.mapPartitions(in -> {
+        StringBuilder out = new StringBuilder();
+        while (in.hasNext()) {
+          out.append(in.next().toUpperCase(Locale.ENGLISH));
+        }
+        return Arrays.asList(out.toString()).iterator();
+      });
     JavaTestUtils.attachTestOutputStream(mapped);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -416,18 +401,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(9,10,11));
 
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaDStream<Integer> transformed = stream.transform(
-      new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
-        @Override
-        public JavaRDD<Integer> call(JavaRDD<Integer> in) {
-          return in.map(new Function<Integer, Integer>() {
-            @Override
-            public Integer call(Integer i) {
-              return i + 2;
-            }
-          });
-        }
-      });
+    JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 
2));
 
     JavaTestUtils.attachTestOutputStream(transformed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -448,71 +422,21 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(
         JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
 
-    stream.transform(
-        new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
-          @Override
-          public JavaRDD<Integer> call(JavaRDD<Integer> in) {
-            return null;
-          }
-        }
-    );
+    stream.transform(in -> null);
 
-    stream.transform(
-      new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
-        @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) 
{
-          return null;
-        }
-      }
-    );
+    stream.transform((in, time) -> null);
 
-    stream.transformToPair(
-        new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
-          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> 
in) {
-            return null;
-          }
-        }
-    );
+    stream.transformToPair(in -> null);
 
-    stream.transformToPair(
-        new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
-          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> 
in, Time time) {
-            return null;
-          }
-        }
-    );
+    stream.transformToPair((in, time) -> null);
 
-    pairStream.transform(
-        new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
-          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> 
in) {
-            return null;
-          }
-        }
-    );
+    pairStream.transform(in -> null);
 
-    pairStream.transform(
-        new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
-          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> 
in, Time time) {
-            return null;
-          }
-        }
-    );
+    pairStream.transform((in, time) -> null);
 
-    pairStream.transformToPair(
-        new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
String>>() {
-          @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in) {
-            return null;
-          }
-        }
-    );
+    pairStream.transformToPair(in -> null);
 
-    pairStream.transformToPair(
-        new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, 
String>>() {
-          @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in,
-                                                            Time time) {
-            return null;
-          }
-        }
-    );
+    pairStream.transformToPair((in, time) -> null);
 
   }
 
@@ -558,19 +482,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
     JavaPairDStream<String, Tuple2<String, String>> joined = 
pairStream1.transformWithToPair(
         pairStream2,
-        new Function3<
-            JavaPairRDD<String, String>,
-            JavaPairRDD<String, String>,
-            Time,
-            JavaPairRDD<String, Tuple2<String, String>>>() {
-          @Override
-          public JavaPairRDD<String, Tuple2<String, String>> call(
-              JavaPairRDD<String, String> rdd1,
-              JavaPairRDD<String, String> rdd2,
-              Time time) {
-            return rdd1.join(rdd2);
-          }
-        }
+        (rdd1, rdd2, time) -> rdd1.join(rdd2)
     );
 
     JavaTestUtils.attachTestOutputStream(joined);
@@ -603,100 +515,21 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Double, Character> pairStream2 = 
JavaPairDStream.fromJavaDStream(
         JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
 
-    stream1.transformWith(
-        stream2,
-        new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, 
JavaRDD<Double>>() {
-          @Override
-          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> 
rdd2, Time time) {
-            return null;
-          }
-        }
-    );
+    stream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
 
-    stream1.transformWith(
-        pairStream1,
-        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, 
JavaRDD<Double>>() {
-          @Override
-          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, 
JavaPairRDD<String, Integer> rdd2,
-                                      Time time) {
-            return null;
-          }
-        }
-    );
+    stream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
 
-    stream1.transformWithToPair(
-        stream2,
-        new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, 
JavaPairRDD<Double, Double>>() {
-          @Override
-          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, 
JavaRDD<String> rdd2,
-                                                  Time time) {
-            return null;
-          }
-        }
-    );
+    stream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
 
-    stream1.transformWithToPair(
-        pairStream1,
-        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time,
-          JavaPairRDD<Double, Double>>() {
-          @Override
-          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1,
-                                                  JavaPairRDD<String, Integer> 
rdd2,
-                                                  Time time) {
-            return null;
-          }
-        }
-    );
+    stream1.transformWithToPair(pairStream1, (rdd1, rdd2, time) -> null);
 
-    pairStream1.transformWith(
-        stream2,
-        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, 
JavaRDD<Double>>() {
-          @Override
-          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, 
JavaRDD<String> rdd2,
-                                      Time time) {
-            return null;
-          }
-        }
-    );
+    pairStream1.transformWith(stream2, (rdd1, rdd2, time) -> null);
 
-    pairStream1.transformWith(
-        pairStream1,
-        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
Integer>, Time,
-          JavaRDD<Double>>() {
-          @Override
-          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1,
-                                      JavaPairRDD<String, Integer> rdd2,
-                                      Time time) {
-            return null;
-          }
-        }
-    );
+    pairStream1.transformWith(pairStream1, (rdd1, rdd2, time) -> null);
 
-    pairStream1.transformWithToPair(
-        stream2,
-        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time,
-          JavaPairRDD<Double, Double>>() {
-          @Override
-          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1,
-                                                  JavaRDD<String> rdd2,
-                                                  Time time) {
-            return null;
-          }
-        }
-    );
+    pairStream1.transformWithToPair(stream2, (rdd1, rdd2, time) -> null);
 
-    pairStream1.transformWithToPair(
-        pairStream2,
-        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, 
Character>, Time,
-          JavaPairRDD<Double, Double>>() {
-          @Override
-          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1,
-                                                  JavaPairRDD<Double, 
Character> rdd2,
-                                                  Time time) {
-            return null;
-          }
-        }
-    );
+    pairStream1.transformWithToPair(pairStream2, (rdd1, rdd2, time) -> null);
   }
 
   @SuppressWarnings("unchecked")
@@ -727,44 +560,32 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaPairDStream<Integer, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(
         JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
 
-    List<JavaDStream<?>> listOfDStreams1 = 
Arrays.<JavaDStream<?>>asList(stream1, stream2);
+    List<JavaDStream<?>> listOfDStreams1 = Arrays.asList(stream1, stream2);
 
     // This is just to test whether this transform to JavaStream compiles
     ssc.transform(
       listOfDStreams1,
-      new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
-        @Override
-        public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
-          Assert.assertEquals(2, listOfRDDs.size());
-          return null;
-        }
+      (listOfRDDs, time) -> {
+        Assert.assertEquals(2, listOfRDDs.size());
+        return null;
       }
     );
 
     List<JavaDStream<?>> listOfDStreams2 =
-        Arrays.<JavaDStream<?>>asList(stream1, stream2, 
pairStream1.toJavaDStream());
+        Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
 
     JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = 
ssc.transformToPair(
       listOfDStreams2,
-      new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, 
Tuple2<Integer, String>>>() {
-        @Override
-        public JavaPairRDD<Integer, Tuple2<Integer, String>> 
call(List<JavaRDD<?>> listOfRDDs,
-                                                                  Time time) {
-          Assert.assertEquals(3, listOfRDDs.size());
-          JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
-          JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
-          JavaRDD<Tuple2<Integer, String>> rdd3 =
-            (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
-          JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
-          PairFunction<Integer, Integer, Integer> mapToTuple =
-            new PairFunction<Integer, Integer, Integer>() {
-            @Override
-            public Tuple2<Integer, Integer> call(Integer i) {
-              return new Tuple2<>(i, i);
-            }
-          };
-          return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
-        }
+      (listOfRDDs, time) -> {
+        Assert.assertEquals(3, listOfRDDs.size());
+        JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
+        JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
+        JavaRDD<Tuple2<Integer, String>> rdd3 =
+          (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
+        JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+        PairFunction<Integer, Integer, Integer> mapToTuple =
+            (PairFunction<Integer, Integer, Integer>) i -> new Tuple2<>(i, i);
+        return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
       }
     );
     JavaTestUtils.attachTestOutputStream(transformed2);
@@ -787,12 +608,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList("a","t","h","l","e","t","i","c","s"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaDStream<String> flatMapped = stream.flatMap(new 
FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(x.split("(?!^)")).iterator();
-      }
-    });
+    JavaDStream<String> flatMapped = stream.flatMap(x -> 
Arrays.asList(x.split("(?!^)")).iterator());
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
 
@@ -811,25 +627,13 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
     JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
 
-    stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
-      @Override
-      public void call(JavaRDD<Integer> rdd) {
-        accumRdd.add(1);
-        rdd.foreach(new VoidFunction<Integer>() {
-          @Override
-          public void call(Integer i) {
-            accumEle.add(1);
-          }
-        });
-      }
+    stream.foreachRDD(rdd -> {
+      accumRdd.add(1);
+      rdd.foreach(i -> accumEle.add(1));
     });
 
     // This is a test to make sure foreachRDD(VoidFunction2) can be called 
from Java
-    stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
-      @Override
-      public void call(JavaRDD<Integer> rdd, Time time) {
-      }
-    });
+    stream.foreachRDD((rdd, time) -> {});
 
     JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -873,16 +677,12 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
             new Tuple2<>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
-      new PairFlatMapFunction<String, Integer, String>() {
-        @Override
-        public Iterator<Tuple2<Integer, String>> call(String in) {
-          List<Tuple2<Integer, String>> out = new ArrayList<>();
-          for (String letter: in.split("(?!^)")) {
-            out.add(new Tuple2<>(in.length(), letter));
-          }
-          return out.iterator();
+    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(in -> {
+        List<Tuple2<Integer, String>> out = new ArrayList<>();
+        for (String letter : in.split("(?!^)")) {
+          out.add(new Tuple2<>(in.length(), letter));
         }
+        return out.iterator();
       });
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
@@ -949,21 +749,10 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         Arrays.asList(new Tuple2<>("yankees", 7)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
-        new PairFunction<String, String, Integer>() {
-          @Override
-          public Tuple2<String, Integer> call(String in) {
-            return new Tuple2<>(in, in.length());
-          }
-        });
+    JavaPairDStream<String, Integer> pairStream =
+        stream.mapToPair(in -> new Tuple2<>(in, in.length()));
 
-    JavaPairDStream<String, Integer> filtered = pairStream.filter(
-        new Function<Tuple2<String, Integer>, Boolean>() {
-      @Override
-      public Boolean call(Tuple2<String, Integer> in) {
-        return in._1().contains("a");
-      }
-    });
+    JavaPairDStream<String, Integer> filtered = pairStream.filter(in -> 
in._1().contains("a"));
     JavaTestUtils.attachTestOutputStream(filtered);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
 
@@ -1014,13 +803,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
-        new PairFunction<Tuple2<String, Integer>, Integer, String>() {
-          @Override
-          public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
-            return in.swap();
-          }
-        });
+    JavaPairDStream<Integer, String> reversed = 
pairStream.mapToPair(Tuple2::swap);
 
     JavaTestUtils.attachTestOutputStream(reversed);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
@@ -1048,18 +831,14 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
-        new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, 
String>() {
-          @Override
-          public Iterator<Tuple2<Integer, String>> 
call(Iterator<Tuple2<String, Integer>> in) {
-            List<Tuple2<Integer, String>> out = new LinkedList<>();
-            while (in.hasNext()) {
-              Tuple2<String, Integer> next = in.next();
-              out.add(next.swap());
-            }
-            return out.iterator();
-          }
-        });
+    JavaPairDStream<Integer, String> reversed = 
pairStream.mapPartitionsToPair(in -> {
+        List<Tuple2<Integer, String>> out = new LinkedList<>();
+        while (in.hasNext()) {
+          Tuple2<String, Integer> next = in.next();
+          out.add(next.swap());
+        }
+        return out.iterator();
+      });
 
     JavaTestUtils.attachTestOutputStream(reversed);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
@@ -1079,13 +858,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaDStream<Integer> reversed = pairStream.map(
-        new Function<Tuple2<String, Integer>, Integer>() {
-          @Override
-          public Integer call(Tuple2<String, Integer> in) {
-            return in._2();
-          }
-        });
+    JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
 
     JavaTestUtils.attachTestOutputStream(reversed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1119,17 +892,13 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream =
         JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
-        new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
-          @Override
-          public Iterator<Tuple2<Integer, String>> call(Tuple2<String, 
Integer> in) {
-            List<Tuple2<Integer, String>> out = new LinkedList<>();
-            for (Character s : in._1().toCharArray()) {
-              out.add(new Tuple2<>(in._2(), s.toString()));
-            }
-            return out.iterator();
-          }
-        });
+    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in 
-> {
+        List<Tuple2<Integer, String>> out = new LinkedList<>();
+        for (Character s : in._1().toCharArray()) {
+          out.add(new Tuple2<>(in._2(), s.toString()));
+        }
+        return out.iterator();
+      });
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
 
@@ -1216,12 +985,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
-        new Function<Integer, Integer>() {
-          @Override
-          public Integer call(Integer i) {
-            return i;
-          }
-        }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
+        i -> i, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
 
     JavaTestUtils.attachTestOutputStream(combined);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
@@ -1345,20 +1109,16 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
-        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
-          @Override
-          public Optional<Integer> call(List<Integer> values, 
Optional<Integer> state) {
-            int out = 0;
-            if (state.isPresent()) {
-              out += state.get();
-            }
-            for (Integer v : values) {
-              out += v;
-            }
-            return Optional.of(out);
-          }
-        });
+    JavaPairDStream<String, Integer> updated = 
pairStream.updateStateByKey((values, state) -> {
+        int out = 0;
+        if (state.isPresent()) {
+          out += state.get();
+        }
+        for (Integer v : values) {
+          out += v;
+        }
+        return Optional.of(out);
+      });
     JavaTestUtils.attachTestOutputStream(updated);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
 
@@ -1389,20 +1149,16 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
-        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
-          @Override
-          public Optional<Integer> call(List<Integer> values, 
Optional<Integer> state) {
-            int out = 0;
-            if (state.isPresent()) {
-              out += state.get();
-            }
-            for (Integer v : values) {
-              out += v;
-            }
-            return Optional.of(out);
-          }
-        }, new HashPartitioner(1), initialRDD);
+    JavaPairDStream<String, Integer> updated = 
pairStream.updateStateByKey((values, state) -> {
+        int out = 0;
+        if (state.isPresent()) {
+          out += state.get();
+        }
+        for (Integer v : values) {
+          out += v;
+        }
+        return Optional.of(out);
+      }, new HashPartitioner(1), initialRDD);
     JavaTestUtils.attachTestOutputStream(updated);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
 
@@ -1500,13 +1256,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         ssc, inputData, 1);
     JavaPairDStream<Integer, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
-        new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, 
Integer>>() {
-          @Override
-          public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, 
Integer> in) {
-            return in.sortByKey();
-          }
-        });
+    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in 
-> in.sortByKey());
 
     JavaTestUtils.attachTestOutputStream(sorted);
     List<List<Tuple2<Integer, Integer>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1537,18 +1287,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         ssc, inputData, 1);
     JavaPairDStream<Integer, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaDStream<Integer> firstParts = pairStream.transform(
-        new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
-          @Override
-          public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
-            return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
-              @Override
-              public Integer call(Tuple2<Integer, Integer> in2) {
-                return in2._1();
-              }
-            });
-          }
-        });
+    JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(in2 -> 
in2._1()));
 
     JavaTestUtils.attachTestOutputStream(firstParts);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -1575,12 +1314,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, String> mapped = pairStream.mapValues(new 
Function<String, String>() {
-      @Override
-      public String call(String s) {
-        return s.toUpperCase(Locale.ENGLISH);
-      }
-    });
+    JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> 
s.toUpperCase(Locale.ENGLISH));
 
     JavaTestUtils.attachTestOutputStream(mapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
@@ -1616,16 +1350,12 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
 
-    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
-        new Function<String, Iterable<String>>() {
-          @Override
-          public Iterable<String> call(String in) {
-            List<String> out = new ArrayList<>();
-            out.add(in + "1");
-            out.add(in + "2");
-            return out;
-          }
-        });
+    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in 
-> {
+        List<String> out = new ArrayList<>();
+        out.add(in + "1");
+        out.add(in + "2");
+        return out;
+      });
 
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
@@ -1795,12 +1525,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     ssc.checkpoint(tempDir.getAbsolutePath());
 
     JavaDStream<String> stream = 
JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> letterCount = stream.map(new Function<String, 
Integer>() {
-      @Override
-      public Integer call(String s) {
-        return s.length();
-      }
-    });
+    JavaDStream<Integer> letterCount = stream.map(String::length);
     JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
     List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
 
@@ -1822,7 +1547,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
   public void testContextGetOrCreate() throws InterruptedException {
     ssc.stop();
 
-    final SparkConf conf = new SparkConf()
+    SparkConf conf = new SparkConf()
         .setMaster("local[2]")
         .setAppName("test")
         .set("newContext", "true");
@@ -1835,13 +1560,10 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
     // Function to create JavaStreamingContext without any output operations
     // (used to detect the new context)
-    final AtomicBoolean newContextCreated = new AtomicBoolean(false);
-    Function0<JavaStreamingContext> creatingFunc = new 
Function0<JavaStreamingContext>() {
-      @Override
-      public JavaStreamingContext call() {
-        newContextCreated.set(true);
-        return new JavaStreamingContext(conf, Seconds.apply(1));
-      }
+    AtomicBoolean newContextCreated = new AtomicBoolean(false);
+    Function0<JavaStreamingContext> creatingFunc = () -> {
+      newContextCreated.set(true);
+      return new JavaStreamingContext(conf, Seconds.apply(1));
     };
 
     newContextCreated.set(false);
@@ -1912,18 +1634,15 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     ssc.socketStream(
       "localhost",
       12345,
-      new Function<InputStream, Iterable<String>>() {
-        @Override
-        public Iterable<String> call(InputStream in) throws IOException {
-          List<String> out = new ArrayList<>();
-          try (BufferedReader reader = new BufferedReader(
-              new InputStreamReader(in, StandardCharsets.UTF_8))) {
-            for (String line; (line = reader.readLine()) != null;) {
-              out.add(line);
-            }
+      in -> {
+        List<String> out = new ArrayList<>();
+        try (BufferedReader reader = new BufferedReader(
+            new InputStreamReader(in, StandardCharsets.UTF_8))) {
+          for (String line; (line = reader.readLine()) != null;) {
+            out.add(line);
           }
-          return out;
         }
+        return out;
       },
       StorageLevel.MEMORY_ONLY());
   }
@@ -1952,21 +1671,10 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
       LongWritable.class,
       Text.class,
       TextInputFormat.class,
-      new Function<Path, Boolean>() {
-        @Override
-        public Boolean call(Path v1) {
-          return Boolean.TRUE;
-        }
-      },
+      v1 -> Boolean.TRUE,
       true);
 
-    JavaDStream<String> test = inputStream.map(
-      new Function<Tuple2<LongWritable, Text>, String>() {
-        @Override
-        public String call(Tuple2<LongWritable, Text> v1) {
-          return v1._2().toString();
-        }
-    });
+    JavaDStream<String> test = inputStream.map(v1 -> v1._2().toString());
 
     JavaTestUtils.attachTestOutputStream(test);
     List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to