[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features

## What changes were proposed in this pull request?

Convert tests to use Java 8 lambdas, and modest related fixes to surrounding 
code.

## How was this patch tested?

Jenkins tests

Author: Sean Owen <[email protected]>

Closes #16964 from srowen/SPARK-19534.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1487c9af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1487c9af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1487c9af

Branch: refs/heads/master
Commit: 1487c9af20a333ead55955acf4c0aa323bea0d07
Parents: de14d35
Author: Sean Owen <[email protected]>
Authored: Sun Feb 19 09:42:50 2017 -0800
Committer: Sean Owen <[email protected]>
Committed: Sun Feb 19 09:42:50 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/network/TransportContext.java  |   6 +-
 .../spark/network/util/MapConfigProvider.java   |   8 +-
 .../network/ChunkFetchIntegrationSuite.java     |  37 +-
 .../network/RequestTimeoutIntegrationSuite.java |   3 +-
 .../network/TransportClientFactorySuite.java    |  51 +-
 .../network/TransportResponseHandlerSuite.java  |  14 +-
 .../network/crypto/AuthIntegrationSuite.java    |  19 +-
 .../spark/network/sasl/SparkSaslSuite.java      |  65 +--
 .../util/TransportFrameDecoderSuite.java        |  44 +-
 .../network/sasl/SaslIntegrationSuite.java      |  34 +-
 .../ExternalShuffleBlockHandlerSuite.java       |   2 +-
 .../shuffle/ExternalShuffleCleanupSuite.java    |   6 +-
 .../ExternalShuffleIntegrationSuite.java        |  13 +-
 .../shuffle/OneForOneBlockFetcherSuite.java     |  78 ++-
 .../shuffle/RetryingBlockFetcherSuite.java      |  64 ++-
 .../unsafe/sort/UnsafeExternalSorter.java       |   1 -
 .../java/org/apache/spark/JavaJdbcRDDSuite.java |  26 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |  65 +--
 .../map/AbstractBytesToBytesMapSuite.java       |  25 +-
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  25 +-
 .../test/org/apache/spark/Java8RDDAPISuite.java |   7 +-
 .../test/org/apache/spark/JavaAPISuite.java     | 492 ++++-------------
 .../kafka010/JavaConsumerStrategySuite.java     |  24 +-
 .../SparkSubmitCommandBuilderSuite.java         |   2 +-
 .../launcher/SparkSubmitOptionParserSuite.java  |   8 +-
 .../apache/spark/ml/feature/JavaPCASuite.java   |  35 +-
 .../classification/JavaNaiveBayesSuite.java     |  10 +-
 .../clustering/JavaBisectingKMeansSuite.java    |   4 +-
 .../spark/mllib/clustering/JavaLDASuite.java    |  40 +-
 .../mllib/fpm/JavaAssociationRulesSuite.java    |   6 +-
 .../regression/JavaLinearRegressionSuite.java   |  11 +-
 .../spark/mllib/tree/JavaDecisionTreeSuite.java |  15 +-
 .../SpecificParquetRecordReaderBase.java        |   2 +-
 .../spark/sql/Java8DatasetAggregatorSuite.java  |  16 +-
 .../apache/spark/sql/JavaApplySchemaSuite.java  |  22 +-
 .../apache/spark/sql/JavaDataFrameSuite.java    |  47 +-
 .../spark/sql/JavaDatasetAggregatorSuite.java   |  49 +-
 .../sql/JavaDatasetAggregatorSuiteBase.java     |  14 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 147 ++----
 .../test/org/apache/spark/sql/JavaUDFSuite.java |  37 +-
 .../spark/streaming/JavaMapWithStateSuite.java  |  81 +--
 .../spark/streaming/JavaReceiverAPISuite.java   |  24 +-
 .../spark/streaming/JavaWriteAheadLogSuite.java |  10 +-
 .../apache/spark/streaming/Java8APISuite.java   |  21 +-
 .../apache/spark/streaming/JavaAPISuite.java    | 526 +++++--------------
 45 files changed, 662 insertions(+), 1574 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index 37ba543..965c4ae 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -17,9 +17,9 @@
 
 package org.apache.spark.network;
 
+import java.util.ArrayList;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import io.netty.channel.Channel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -100,7 +100,7 @@ public class TransportContext {
   }
 
   public TransportClientFactory createClientFactory() {
-    return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
+    return createClientFactory(new ArrayList<>());
   }
 
   /** Create a server which will attempt to bind to a specific port. */
@@ -120,7 +120,7 @@ public class TransportContext {
   }
 
   public TransportServer createServer() {
-    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
+    return createServer(0, new ArrayList<>());
   }
 
   public TransportChannelHandler initializePipeline(SocketChannel channel) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
index b666799..9cfee7f 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
@@ -17,22 +17,20 @@
 
 package org.apache.spark.network.util;
 
-import com.google.common.collect.Maps;
-
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
 /** ConfigProvider based on a Map (copied in the constructor). */
 public class MapConfigProvider extends ConfigProvider {
 
-  public static final MapConfigProvider EMPTY = new MapConfigProvider(
-      Collections.<String, String>emptyMap());
+  public static final MapConfigProvider EMPTY = new 
MapConfigProvider(Collections.emptyMap());
 
   private final Map<String, String> config;
 
   public MapConfigProvider(Map<String, String> config) {
-    this.config = Maps.newHashMap(config);
+    this.config = new HashMap<>(config);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index 5bb8819..824482a 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network;
 import java.io.File;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -29,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closeables;
 import org.junit.AfterClass;
@@ -179,49 +179,49 @@ public class ChunkFetchIntegrationSuite {
 
   @Test
   public void fetchBufferChunk() throws Exception {
-    FetchResult res = fetchChunks(Lists.newArrayList(BUFFER_CHUNK_INDEX));
-    assertEquals(res.successChunks, Sets.newHashSet(BUFFER_CHUNK_INDEX));
+    FetchResult res = fetchChunks(Arrays.asList(BUFFER_CHUNK_INDEX));
+    assertEquals(Sets.newHashSet(BUFFER_CHUNK_INDEX), res.successChunks);
     assertTrue(res.failedChunks.isEmpty());
-    assertBufferListsEqual(res.buffers, Lists.newArrayList(bufferChunk));
+    assertBufferListsEqual(Arrays.asList(bufferChunk), res.buffers);
     res.releaseBuffers();
   }
 
   @Test
   public void fetchFileChunk() throws Exception {
-    FetchResult res = fetchChunks(Lists.newArrayList(FILE_CHUNK_INDEX));
-    assertEquals(res.successChunks, Sets.newHashSet(FILE_CHUNK_INDEX));
+    FetchResult res = fetchChunks(Arrays.asList(FILE_CHUNK_INDEX));
+    assertEquals(Sets.newHashSet(FILE_CHUNK_INDEX), res.successChunks);
     assertTrue(res.failedChunks.isEmpty());
-    assertBufferListsEqual(res.buffers, Lists.newArrayList(fileChunk));
+    assertBufferListsEqual(Arrays.asList(fileChunk), res.buffers);
     res.releaseBuffers();
   }
 
   @Test
   public void fetchNonExistentChunk() throws Exception {
-    FetchResult res = fetchChunks(Lists.newArrayList(12345));
+    FetchResult res = fetchChunks(Arrays.asList(12345));
     assertTrue(res.successChunks.isEmpty());
-    assertEquals(res.failedChunks, Sets.newHashSet(12345));
+    assertEquals(Sets.newHashSet(12345), res.failedChunks);
     assertTrue(res.buffers.isEmpty());
   }
 
   @Test
   public void fetchBothChunks() throws Exception {
-    FetchResult res = fetchChunks(Lists.newArrayList(BUFFER_CHUNK_INDEX, 
FILE_CHUNK_INDEX));
-    assertEquals(res.successChunks, Sets.newHashSet(BUFFER_CHUNK_INDEX, 
FILE_CHUNK_INDEX));
+    FetchResult res = fetchChunks(Arrays.asList(BUFFER_CHUNK_INDEX, 
FILE_CHUNK_INDEX));
+    assertEquals(Sets.newHashSet(BUFFER_CHUNK_INDEX, FILE_CHUNK_INDEX), 
res.successChunks);
     assertTrue(res.failedChunks.isEmpty());
-    assertBufferListsEqual(res.buffers, Lists.newArrayList(bufferChunk, 
fileChunk));
+    assertBufferListsEqual(Arrays.asList(bufferChunk, fileChunk), res.buffers);
     res.releaseBuffers();
   }
 
   @Test
   public void fetchChunkAndNonExistent() throws Exception {
-    FetchResult res = fetchChunks(Lists.newArrayList(BUFFER_CHUNK_INDEX, 
12345));
-    assertEquals(res.successChunks, Sets.newHashSet(BUFFER_CHUNK_INDEX));
-    assertEquals(res.failedChunks, Sets.newHashSet(12345));
-    assertBufferListsEqual(res.buffers, Lists.newArrayList(bufferChunk));
+    FetchResult res = fetchChunks(Arrays.asList(BUFFER_CHUNK_INDEX, 12345));
+    assertEquals(Sets.newHashSet(BUFFER_CHUNK_INDEX), res.successChunks);
+    assertEquals(Sets.newHashSet(12345), res.failedChunks);
+    assertBufferListsEqual(Arrays.asList(bufferChunk), res.buffers);
     res.releaseBuffers();
   }
 
-  private void assertBufferListsEqual(List<ManagedBuffer> list0, 
List<ManagedBuffer> list1)
+  private static void assertBufferListsEqual(List<ManagedBuffer> list0, 
List<ManagedBuffer> list1)
       throws Exception {
     assertEquals(list0.size(), list1.size());
     for (int i = 0; i < list0.size(); i ++) {
@@ -229,7 +229,8 @@ public class ChunkFetchIntegrationSuite {
     }
   }
 
-  private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer 
buffer1) throws Exception {
+  private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer 
buffer1)
+      throws Exception {
     ByteBuffer nio0 = buffer0.nioByteBuffer();
     ByteBuffer nio1 = buffer1.nioByteBuffer();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index 959396b..9aa17e2 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network;
 
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
@@ -60,7 +59,7 @@ public class RequestTimeoutIntegrationSuite {
 
   @Before
   public void setUp() throws Exception {
-    Map<String, String> configMap = Maps.newHashMap();
+    Map<String, String> configMap = new HashMap<>();
     configMap.put("spark.shuffle.io.connectionTimeout", "10s");
     conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index 205ab88..e95d25f 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -19,19 +19,20 @@ package org.apache.spark.network;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.spark.network.client.TransportClient;
@@ -71,39 +72,36 @@ public class TransportClientFactorySuite {
    *
    * If concurrent is true, create multiple threads to create clients in 
parallel.
    */
-  private void testClientReuse(final int maxConnections, boolean concurrent)
+  private void testClientReuse(int maxConnections, boolean concurrent)
     throws IOException, InterruptedException {
 
-    Map<String, String> configMap = Maps.newHashMap();
+    Map<String, String> configMap = new HashMap<>();
     configMap.put("spark.shuffle.io.numConnectionsPerPeer", 
Integer.toString(maxConnections));
     TransportConf conf = new TransportConf("shuffle", new 
MapConfigProvider(configMap));
 
     RpcHandler rpcHandler = new NoOpRpcHandler();
     TransportContext context = new TransportContext(conf, rpcHandler);
-    final TransportClientFactory factory = context.createClientFactory();
-    final Set<TransportClient> clients = Collections.synchronizedSet(
+    TransportClientFactory factory = context.createClientFactory();
+    Set<TransportClient> clients = Collections.synchronizedSet(
       new HashSet<TransportClient>());
 
-    final AtomicInteger failed = new AtomicInteger();
+    AtomicInteger failed = new AtomicInteger();
     Thread[] attempts = new Thread[maxConnections * 10];
 
     // Launch a bunch of threads to create new clients.
     for (int i = 0; i < attempts.length; i++) {
-      attempts[i] = new Thread() {
-        @Override
-        public void run() {
-          try {
-            TransportClient client =
-              factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
-            assertTrue(client.isActive());
-            clients.add(client);
-          } catch (IOException e) {
-            failed.incrementAndGet();
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-          }
+      attempts[i] = new Thread(() -> {
+        try {
+          TransportClient client =
+            factory.createClient(TestUtils.getLocalHost(), server1.getPort());
+          assertTrue(client.isActive());
+          clients.add(client);
+        } catch (IOException e) {
+          failed.incrementAndGet();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
         }
-      };
+      });
 
       if (concurrent) {
         attempts[i].start();
@@ -113,8 +111,8 @@ public class TransportClientFactorySuite {
     }
 
     // Wait until all the threads complete.
-    for (int i = 0; i < attempts.length; i++) {
-      attempts[i].join();
+    for (Thread attempt : attempts) {
+      attempt.join();
     }
 
     Assert.assertEquals(0, failed.get());
@@ -150,7 +148,7 @@ public class TransportClientFactorySuite {
     TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), 
server2.getPort());
     assertTrue(c1.isActive());
     assertTrue(c2.isActive());
-    assertTrue(c1 != c2);
+    assertNotSame(c1, c2);
     factory.close();
   }
 
@@ -167,7 +165,7 @@ public class TransportClientFactorySuite {
     assertFalse(c1.isActive());
 
     TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
-    assertFalse(c1 == c2);
+    assertNotSame(c1, c2);
     assertTrue(c2.isActive());
     factory.close();
   }
@@ -207,8 +205,7 @@ public class TransportClientFactorySuite {
       }
     });
     TransportContext context = new TransportContext(conf, new 
NoOpRpcHandler(), true);
-    TransportClientFactory factory = context.createClientFactory();
-    try {
+    try (TransportClientFactory factory = context.createClientFactory()) {
       TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
       assertTrue(c1.isActive());
       long expiredTime = System.currentTimeMillis() + 10000; // 10 seconds
@@ -216,8 +213,6 @@ public class TransportClientFactorySuite {
         Thread.sleep(10);
       }
       assertFalse(c1.isActive());
-    } finally {
-      factory.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
index 128f7cb..4477c9a 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
@@ -24,8 +24,6 @@ import io.netty.channel.local.LocalChannel;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.*;
 
 import org.apache.spark.network.buffer.ManagedBuffer;
@@ -54,7 +52,7 @@ public class TransportResponseHandlerSuite {
     assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new ChunkFetchSuccess(streamChunkId, new 
TestManagedBuffer(123)));
-    verify(callback, times(1)).onSuccess(eq(0), (ManagedBuffer) any());
+    verify(callback, times(1)).onSuccess(eq(0), any());
     assertEquals(0, handler.numOutstandingRequests());
   }
 
@@ -67,7 +65,7 @@ public class TransportResponseHandlerSuite {
     assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new ChunkFetchFailure(streamChunkId, "some error msg"));
-    verify(callback, times(1)).onFailure(eq(0), (Throwable) any());
+    verify(callback, times(1)).onFailure(eq(0), any());
     assertEquals(0, handler.numOutstandingRequests());
   }
 
@@ -84,9 +82,9 @@ public class TransportResponseHandlerSuite {
     handler.exceptionCaught(new Exception("duh duh duhhhh"));
 
     // should fail both b2 and b3
-    verify(callback, times(1)).onSuccess(eq(0), (ManagedBuffer) any());
-    verify(callback, times(1)).onFailure(eq(1), (Throwable) any());
-    verify(callback, times(1)).onFailure(eq(2), (Throwable) any());
+    verify(callback, times(1)).onSuccess(eq(0), any());
+    verify(callback, times(1)).onFailure(eq(1), any());
+    verify(callback, times(1)).onFailure(eq(2), any());
     assertEquals(0, handler.numOutstandingRequests());
   }
 
@@ -118,7 +116,7 @@ public class TransportResponseHandlerSuite {
     assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new RpcFailure(12345, "oh no"));
-    verify(callback, times(1)).onFailure((Throwable) any());
+    verify(callback, times(1)).onFailure(any());
     assertEquals(0, handler.numOutstandingRequests());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
index 21609d5..8751944 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java
@@ -18,11 +18,11 @@
 package org.apache.spark.network.crypto;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import io.netty.channel.Channel;
 import org.junit.After;
 import org.junit.Test;
@@ -163,20 +163,17 @@ public class AuthIntegrationSuite {
     }
 
     void createServer(String secret, boolean enableAes) throws Exception {
-      TransportServerBootstrap introspector = new TransportServerBootstrap() {
-        @Override
-        public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
-          AuthTestCtx.this.serverChannel = channel;
-          if (rpcHandler instanceof AuthRpcHandler) {
-            AuthTestCtx.this.authRpcHandler = (AuthRpcHandler) rpcHandler;
-          }
-          return rpcHandler;
+      TransportServerBootstrap introspector = (channel, rpcHandler) -> {
+        this.serverChannel = channel;
+        if (rpcHandler instanceof AuthRpcHandler) {
+          this.authRpcHandler = (AuthRpcHandler) rpcHandler;
         }
+        return rpcHandler;
       };
       SecretKeyHolder keyHolder = createKeyHolder(secret);
       TransportServerBootstrap auth = enableAes ? new 
AuthServerBootstrap(conf, keyHolder)
         : new SaslServerBootstrap(conf, keyHolder);
-      this.server = ctx.createServer(Lists.newArrayList(auth, introspector));
+      this.server = ctx.createServer(Arrays.asList(auth, introspector));
     }
 
     void createClient(String secret) throws Exception {
@@ -186,7 +183,7 @@ public class AuthIntegrationSuite {
     void createClient(String secret, boolean enableAes) throws Exception {
       TransportConf clientConf = enableAes ? conf
         : new TransportConf("rpc", MapConfigProvider.EMPTY);
-      List<TransportClientBootstrap> bootstraps = 
Lists.<TransportClientBootstrap>newArrayList(
+      List<TransportClientBootstrap> bootstraps = Arrays.asList(
         new AuthClientBootstrap(clientConf, appId, createKeyHolder(secret)));
       this.client = ctx.createClientFactory(bootstraps)
         .createClient(TestUtils.getLocalHost(), server.getPort());

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 87129b9..6f15718 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.*;
 import java.io.File;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -35,7 +36,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.security.sasl.SaslException;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import io.netty.buffer.ByteBuf;
@@ -45,8 +45,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.spark.network.TestUtils;
 import org.apache.spark.network.TransportContext;
@@ -137,18 +135,15 @@ public class SparkSaslSuite {
     testBasicSasl(true);
   }
 
-  private void testBasicSasl(boolean encrypt) throws Throwable {
+  private static void testBasicSasl(boolean encrypt) throws Throwable {
     RpcHandler rpcHandler = mock(RpcHandler.class);
-    doAnswer(new Answer<Void>() {
-        @Override
-        public Void answer(InvocationOnMock invocation) {
-          ByteBuffer message = (ByteBuffer) invocation.getArguments()[1];
-          RpcResponseCallback cb = (RpcResponseCallback) 
invocation.getArguments()[2];
-          assertEquals("Ping", JavaUtils.bytesToString(message));
-          cb.onSuccess(JavaUtils.stringToBytes("Pong"));
-          return null;
-        }
-      })
+    doAnswer(invocation -> {
+      ByteBuffer message = (ByteBuffer) invocation.getArguments()[1];
+      RpcResponseCallback cb = (RpcResponseCallback) 
invocation.getArguments()[2];
+      assertEquals("Ping", JavaUtils.bytesToString(message));
+      cb.onSuccess(JavaUtils.stringToBytes("Pong"));
+      return null;
+    })
       .when(rpcHandler)
       .receive(any(TransportClient.class), any(ByteBuffer.class), 
any(RpcResponseCallback.class));
 
@@ -255,21 +250,17 @@ public class SparkSaslSuite {
 
   @Test
   public void testFileRegionEncryption() throws Exception {
-    final Map<String, String> testConf = ImmutableMap.of(
+    Map<String, String> testConf = ImmutableMap.of(
       "spark.network.sasl.maxEncryptedBlockSize", "1k");
 
-    final AtomicReference<ManagedBuffer> response = new AtomicReference<>();
-    final File file = File.createTempFile("sasltest", ".txt");
+    AtomicReference<ManagedBuffer> response = new AtomicReference<>();
+    File file = File.createTempFile("sasltest", ".txt");
     SaslTestCtx ctx = null;
     try {
-      final TransportConf conf = new TransportConf("shuffle", new 
MapConfigProvider(testConf));
+      TransportConf conf = new TransportConf("shuffle", new 
MapConfigProvider(testConf));
       StreamManager sm = mock(StreamManager.class);
-      when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new 
Answer<ManagedBuffer>() {
-          @Override
-          public ManagedBuffer answer(InvocationOnMock invocation) {
-            return new FileSegmentManagedBuffer(conf, file, 0, file.length());
-          }
-        });
+      when(sm.getChunk(anyLong(), anyInt())).thenAnswer(invocation ->
+          new FileSegmentManagedBuffer(conf, file, 0, file.length()));
 
       RpcHandler rpcHandler = mock(RpcHandler.class);
       when(rpcHandler.getStreamManager()).thenReturn(sm);
@@ -280,18 +271,15 @@ public class SparkSaslSuite {
 
       ctx = new SaslTestCtx(rpcHandler, true, false, testConf);
 
-      final CountDownLatch lock = new CountDownLatch(1);
+      CountDownLatch lock = new CountDownLatch(1);
 
       ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-      doAnswer(new Answer<Void>() {
-          @Override
-          public Void answer(InvocationOnMock invocation) {
-            response.set((ManagedBuffer) invocation.getArguments()[1]);
-            response.get().retain();
-            lock.countDown();
-            return null;
-          }
-        }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));
+      doAnswer(invocation -> {
+        response.set((ManagedBuffer) invocation.getArguments()[1]);
+        response.get().retain();
+        lock.countDown();
+        return null;
+      }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));
 
       ctx.client.fetchChunk(0, 0, callback);
       lock.await(10, TimeUnit.SECONDS);
@@ -388,7 +376,7 @@ public class SparkSaslSuite {
         boolean disableClientEncryption)
       throws Exception {
 
-      this(rpcHandler, encrypt, disableClientEncryption, Collections.<String, 
String>emptyMap());
+      this(rpcHandler, encrypt, disableClientEncryption, 
Collections.emptyMap());
     }
 
     SaslTestCtx(
@@ -416,7 +404,7 @@ public class SparkSaslSuite {
         checker));
 
       try {
-        List<TransportClientBootstrap> clientBootstraps = Lists.newArrayList();
+        List<TransportClientBootstrap> clientBootstraps = new ArrayList<>();
         clientBootstraps.add(new SaslClientBootstrap(conf, "user", keyHolder));
         if (disableClientEncryption) {
           clientBootstraps.add(new EncryptionDisablerBootstrap());
@@ -468,11 +456,6 @@ public class SparkSaslSuite {
     }
 
     @Override
-    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-      super.handlerRemoved(ctx);
-    }
-
-    @Override
     public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
       channel.pipeline().addFirst("encryptionChecker", this);
       return rpcHandler;

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
index d4de4a9..b53e413 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
@@ -28,8 +28,6 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -52,7 +50,7 @@ public class TransportFrameDecoderSuite {
 
   @Test
   public void testInterception() throws Exception {
-    final int interceptedReads = 3;
+    int interceptedReads = 3;
     TransportFrameDecoder decoder = new TransportFrameDecoder();
     TransportFrameDecoder.Interceptor interceptor = spy(new 
MockInterceptor(interceptedReads));
     ChannelHandlerContext ctx = mockChannelHandlerContext();
@@ -84,22 +82,19 @@ public class TransportFrameDecoderSuite {
   public void testRetainedFrames() throws Exception {
     TransportFrameDecoder decoder = new TransportFrameDecoder();
 
-    final AtomicInteger count = new AtomicInteger();
-    final List<ByteBuf> retained = new ArrayList<>();
+    AtomicInteger count = new AtomicInteger();
+    List<ByteBuf> retained = new ArrayList<>();
 
     ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-    when(ctx.fireChannelRead(any())).thenAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock in) {
-        // Retain a few frames but not others.
-        ByteBuf buf = (ByteBuf) in.getArguments()[0];
-        if (count.incrementAndGet() % 2 == 0) {
-          retained.add(buf);
-        } else {
-          buf.release();
-        }
-        return null;
+    when(ctx.fireChannelRead(any())).thenAnswer(in -> {
+      // Retain a few frames but not others.
+      ByteBuf buf = (ByteBuf) in.getArguments()[0];
+      if (count.incrementAndGet() % 2 == 0) {
+        retained.add(buf);
+      } else {
+        buf.release();
       }
+      return null;
     });
 
     ByteBuf data = createAndFeedFrames(100, decoder, ctx);
@@ -150,12 +145,6 @@ public class TransportFrameDecoderSuite {
     testInvalidFrame(8);
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testLargeFrame() throws Exception {
-    // Frame length includes the frame size field, so need to add a few more 
bytes.
-    testInvalidFrame(Integer.MAX_VALUE + 9);
-  }
-
   /**
    * Creates a number of randomly sized frames and feed them to the given 
decoder, verifying
    * that the frames were read.
@@ -210,13 +199,10 @@ public class TransportFrameDecoderSuite {
 
   private ChannelHandlerContext mockChannelHandlerContext() {
     ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-    when(ctx.fireChannelRead(any())).thenAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock in) {
-        ByteBuf buf = (ByteBuf) in.getArguments()[0];
-        buf.release();
-        return null;
-      }
+    when(ctx.fireChannelRead(any())).thenAnswer(in -> {
+      ByteBuf buf = (ByteBuf) in.getArguments()[0];
+      buf.release();
+      return null;
     });
     return ctx;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 52f50a3..c0e170e 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -19,11 +19,11 @@ package org.apache.spark.network.sasl;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.collect.Lists;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -38,7 +38,6 @@ import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.ChunkReceivedCallback;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportClientBootstrap;
 import org.apache.spark.network.client.TransportClientFactory;
 import org.apache.spark.network.server.OneForOneStreamManager;
 import org.apache.spark.network.server.RpcHandler;
@@ -105,8 +104,7 @@ public class SaslIntegrationSuite {
   @Test
   public void testGoodClient() throws IOException, InterruptedException {
     clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList(
-        new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+        Arrays.asList(new SaslClientBootstrap(conf, "app-1", 
secretKeyHolder)));
 
     TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
     String msg = "Hello, World!";
@@ -120,8 +118,7 @@ public class SaslIntegrationSuite {
     when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
     when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password");
     clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList(
-        new SaslClientBootstrap(conf, "unknown-app", badKeyHolder)));
+        Arrays.asList(new SaslClientBootstrap(conf, "unknown-app", 
badKeyHolder)));
 
     try {
       // Bootstrap should fail on startup.
@@ -134,8 +131,7 @@ public class SaslIntegrationSuite {
 
   @Test
   public void testNoSaslClient() throws IOException, InterruptedException {
-    clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList());
+    clientFactory = context.createClientFactory(new ArrayList<>());
 
     TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
     try {
@@ -159,15 +155,11 @@ public class SaslIntegrationSuite {
     RpcHandler handler = new TestRpcHandler();
     TransportContext context = new TransportContext(conf, handler);
     clientFactory = context.createClientFactory(
-      Lists.<TransportClientBootstrap>newArrayList(
-        new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
-    TransportServer server = context.createServer();
-    try {
+      Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+    try (TransportServer server = context.createServer()) {
       clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
     } catch (Exception e) {
       assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge 
format violation"));
-    } finally {
-      server.close();
     }
   }
 
@@ -191,14 +183,13 @@ public class SaslIntegrationSuite {
     try {
       // Create a client, and make a request to fetch blocks from a different 
app.
       clientFactory = blockServerContext.createClientFactory(
-        Lists.<TransportClientBootstrap>newArrayList(
-          new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+          Arrays.asList(new SaslClientBootstrap(conf, "app-1", 
secretKeyHolder)));
       client1 = clientFactory.createClient(TestUtils.getLocalHost(),
         blockServer.getPort());
 
-      final AtomicReference<Throwable> exception = new AtomicReference<>();
+      AtomicReference<Throwable> exception = new AtomicReference<>();
 
-      final CountDownLatch blockFetchLatch = new CountDownLatch(1);
+      CountDownLatch blockFetchLatch = new CountDownLatch(1);
       BlockFetchingListener listener = new BlockFetchingListener() {
         @Override
         public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
@@ -235,12 +226,11 @@ public class SaslIntegrationSuite {
       // Create a second client, authenticated with a different app ID, and 
try to read from
       // the stream created for the previous app.
       clientFactory2 = blockServerContext.createClientFactory(
-        Lists.<TransportClientBootstrap>newArrayList(
-          new SaslClientBootstrap(conf, "app-2", secretKeyHolder)));
+          Arrays.asList(new SaslClientBootstrap(conf, "app-2", 
secretKeyHolder)));
       client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
         blockServer.getPort());
 
-      final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
+      CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
       ChunkReceivedCallback callback = new ChunkReceivedCallback() {
         @Override
         public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
@@ -284,7 +274,7 @@ public class SaslIntegrationSuite {
     }
   }
 
-  private void checkSecurityException(Throwable t) {
+  private static void checkSecurityException(Throwable t) {
     assertNotNull("No exception was caught.", t);
     assertTrue("Expected SecurityException.",
       t.getMessage().contains(SecurityException.class.getName()));

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index c036bc2..e47a72c 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -93,7 +93,7 @@ public class ExternalShuffleBlockHandlerSuite {
 
     ArgumentCaptor<ByteBuffer> response = 
ArgumentCaptor.forClass(ByteBuffer.class);
     verify(callback, times(1)).onSuccess(response.capture());
-    verify(callback, never()).onFailure((Throwable) any());
+    verify(callback, never()).onFailure(any());
 
     StreamHandle handle =
       (StreamHandle) 
BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 7757500..47c0870 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -60,12 +60,10 @@ public class ExternalShuffleCleanupSuite {
   public void cleanupUsesExecutor() throws IOException {
     TestShuffleDataContext dataContext = createSomeData();
 
-    final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+    AtomicBoolean cleanupCalled = new AtomicBoolean(false);
 
     // Executor which does nothing to ensure we're actually using it.
-    Executor noThreadExecutor = new Executor() {
-      @Override public void execute(Runnable runnable) { 
cleanupCalled.set(true); }
-    };
+    Executor noThreadExecutor = runnable -> cleanupCalled.set(true);
 
     ExternalShuffleBlockResolver manager =
       new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 88de6fb..b8ae04e 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -29,7 +30,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -173,7 +173,7 @@ public class ExternalShuffleIntegrationSuite {
     FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { 
"shuffle_0_0_0" });
     assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
     assertTrue(exec0Fetch.failedBlocks.isEmpty());
-    assertBufferListsEqual(exec0Fetch.buffers, 
Lists.newArrayList(exec0Blocks[0]));
+    assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0]));
     exec0Fetch.releaseBuffers();
   }
 
@@ -185,7 +185,7 @@ public class ExternalShuffleIntegrationSuite {
     assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", 
"shuffle_0_0_2"),
       exec0Fetch.successBlocks);
     assertTrue(exec0Fetch.failedBlocks.isEmpty());
-    assertBufferListsEqual(exec0Fetch.buffers, 
Lists.newArrayList(exec0Blocks));
+    assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks));
     exec0Fetch.releaseBuffers();
   }
 
@@ -241,7 +241,7 @@ public class ExternalShuffleIntegrationSuite {
     assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), 
execFetch.failedBlocks);
   }
 
-  private void registerExecutor(String executorId, ExecutorShuffleInfo 
executorInfo)
+  private static void registerExecutor(String executorId, ExecutorShuffleInfo 
executorInfo)
       throws IOException, InterruptedException {
     ExternalShuffleClient client = new ExternalShuffleClient(conf, null, 
false);
     client.init(APP_ID);
@@ -249,7 +249,7 @@ public class ExternalShuffleIntegrationSuite {
       executorId, executorInfo);
   }
 
-  private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> 
list1)
+  private static void assertBufferListsEqual(List<ManagedBuffer> list0, 
List<byte[]> list1)
     throws Exception {
     assertEquals(list0.size(), list1.size());
     for (int i = 0; i < list0.size(); i ++) {
@@ -257,7 +257,8 @@ public class ExternalShuffleIntegrationSuite {
     }
   }
 
-  private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer 
buffer1) throws Exception {
+  private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer 
buffer1)
+      throws Exception {
     ByteBuffer nio0 = buffer0.nioByteBuffer();
     ByteBuffer nio1 = buffer1.nioByteBuffer();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
index 2590b9c..3e51fea 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.collect.Maps;
 import io.netty.buffer.Unpooled;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -85,8 +83,8 @@ public class OneForOneBlockFetcherSuite {
 
     // Each failure will cause a failure to be invoked in all remaining block 
fetches.
     verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
-    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) 
any());
-    verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) 
any());
+    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
+    verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any());
   }
 
   @Test
@@ -100,15 +98,15 @@ public class OneForOneBlockFetcherSuite {
 
     // We may call both success and failure for the same block.
     verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
-    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) 
any());
+    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
     verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2"));
-    verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) 
any());
+    verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any());
   }
 
   @Test
   public void testEmptyBlockFetch() {
     try {
-      fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap());
+      fetchBlocks(Maps.newLinkedHashMap());
       fail();
     } catch (IllegalArgumentException e) {
       assertEquals("Zero-sized blockIds array", e.getMessage());
@@ -123,52 +121,46 @@ public class OneForOneBlockFetcherSuite {
    *
    * If a block's buffer is "null", an exception will be thrown instead.
    */
-  private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, 
ManagedBuffer> blocks) {
+  private static BlockFetchingListener fetchBlocks(LinkedHashMap<String, 
ManagedBuffer> blocks) {
     TransportClient client = mock(TransportClient.class);
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
-    final String[] blockIds = blocks.keySet().toArray(new 
String[blocks.size()]);
+    String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
     OneForOneBlockFetcher fetcher =
       new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, 
listener);
 
-    // Respond to the "OpenBlocks" message with an appropirate 
ShuffleStreamHandle with streamId 123
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-        BlockTransferMessage message = 
BlockTransferMessage.Decoder.fromByteBuffer(
-          (ByteBuffer) invocationOnMock.getArguments()[0]);
-        RpcResponseCallback callback = (RpcResponseCallback) 
invocationOnMock.getArguments()[1];
-        callback.onSuccess(new StreamHandle(123, 
blocks.size()).toByteBuffer());
-        assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
-        return null;
-      }
+    // Respond to the "OpenBlocks" message with an appropriate 
ShuffleStreamHandle with streamId 123
+    doAnswer(invocationOnMock -> {
+      BlockTransferMessage message = 
BlockTransferMessage.Decoder.fromByteBuffer(
+        (ByteBuffer) invocationOnMock.getArguments()[0]);
+      RpcResponseCallback callback = (RpcResponseCallback) 
invocationOnMock.getArguments()[1];
+      callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer());
+      assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
+      return null;
     }).when(client).sendRpc(any(ByteBuffer.class), 
any(RpcResponseCallback.class));
 
     // Respond to each chunk request with a single buffer from our blocks 
array.
-    final AtomicInteger expectedChunkIndex = new AtomicInteger(0);
-    final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        try {
-          long streamId = (Long) invocation.getArguments()[0];
-          int myChunkIndex = (Integer) invocation.getArguments()[1];
-          assertEquals(123, streamId);
-          assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
-
-          ChunkReceivedCallback callback = (ChunkReceivedCallback) 
invocation.getArguments()[2];
-          ManagedBuffer result = blockIterator.next();
-          if (result != null) {
-            callback.onSuccess(myChunkIndex, result);
-          } else {
-            callback.onFailure(myChunkIndex, new RuntimeException("Failed " + 
myChunkIndex));
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          fail("Unexpected failure");
+    AtomicInteger expectedChunkIndex = new AtomicInteger(0);
+    Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
+    doAnswer(invocation -> {
+      try {
+        long streamId = (Long) invocation.getArguments()[0];
+        int myChunkIndex = (Integer) invocation.getArguments()[1];
+        assertEquals(123, streamId);
+        assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
+
+        ChunkReceivedCallback callback = (ChunkReceivedCallback) 
invocation.getArguments()[2];
+        ManagedBuffer result = blockIterator.next();
+        if (result != null) {
+          callback.onSuccess(myChunkIndex, result);
+        } else {
+          callback.onFailure(myChunkIndex, new RuntimeException("Failed " + 
myChunkIndex));
         }
-        return null;
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail("Unexpected failure");
       }
-    }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) 
any());
+      return null;
+    }).when(client).fetchChunk(anyLong(), anyInt(), any());
 
     fetcher.start();
     return listener;

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 6db71ee..a530e16 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.stubbing.Stubber;
 
@@ -84,7 +83,7 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any());
+    verify(listener).onBlockFetchFailure(eq("b0"), any());
     verify(listener).onBlockFetchSuccess("b1", block1);
     verifyNoMoreInteractions(listener);
   }
@@ -190,7 +189,7 @@ public class RetryingBlockFetcherSuite {
     performInteractions(interactions, listener);
 
     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) 
any());
+    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
     verifyNoMoreInteractions(listener);
   }
 
@@ -220,7 +219,7 @@ public class RetryingBlockFetcherSuite {
     performInteractions(interactions, listener);
 
     verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) 
any());
+    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
     verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
     verifyNoMoreInteractions(listener);
   }
@@ -249,40 +248,37 @@ public class RetryingBlockFetcherSuite {
     Stubber stub = null;
 
     // Contains all blockIds that are referenced across all interactions.
-    final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
+    LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
 
-    for (final Map<String, Object> interaction : interactions) {
+    for (Map<String, Object> interaction : interactions) {
       blockIds.addAll(interaction.keySet());
 
-      Answer<Void> answer = new Answer<Void>() {
-        @Override
-        public Void answer(InvocationOnMock invocationOnMock) throws Throwable 
{
-          try {
-            // Verify that the RetryingBlockFetcher requested the expected 
blocks.
-            String[] requestedBlockIds = (String[]) 
invocationOnMock.getArguments()[0];
-            String[] desiredBlockIds = interaction.keySet().toArray(new 
String[interaction.size()]);
-            assertArrayEquals(desiredBlockIds, requestedBlockIds);
-
-            // Now actually invoke the success/failure callbacks on each block.
-            BlockFetchingListener retryListener =
-              (BlockFetchingListener) invocationOnMock.getArguments()[1];
-            for (Map.Entry<String, Object> block : interaction.entrySet()) {
-              String blockId = block.getKey();
-              Object blockValue = block.getValue();
-
-              if (blockValue instanceof ManagedBuffer) {
-                retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) 
blockValue);
-              } else if (blockValue instanceof Exception) {
-                retryListener.onBlockFetchFailure(blockId, (Exception) 
blockValue);
-              } else {
-                fail("Can only handle ManagedBuffers and Exceptions, got " + 
blockValue);
-              }
+      Answer<Void> answer = invocationOnMock -> {
+        try {
+          // Verify that the RetryingBlockFetcher requested the expected 
blocks.
+          String[] requestedBlockIds = (String[]) 
invocationOnMock.getArguments()[0];
+          String[] desiredBlockIds = interaction.keySet().toArray(new 
String[interaction.size()]);
+          assertArrayEquals(desiredBlockIds, requestedBlockIds);
+
+          // Now actually invoke the success/failure callbacks on each block.
+          BlockFetchingListener retryListener =
+            (BlockFetchingListener) invocationOnMock.getArguments()[1];
+          for (Map.Entry<String, Object> block : interaction.entrySet()) {
+            String blockId = block.getKey();
+            Object blockValue = block.getValue();
+
+            if (blockValue instanceof ManagedBuffer) {
+              retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) 
blockValue);
+            } else if (blockValue instanceof Exception) {
+              retryListener.onBlockFetchFailure(blockId, (Exception) 
blockValue);
+            } else {
+              fail("Can only handle ManagedBuffers and Exceptions, got " + 
blockValue);
             }
-            return null;
-          } catch (Throwable e) {
-            e.printStackTrace();
-            throw e;
           }
+          return null;
+        } catch (Throwable e) {
+          e.printStackTrace();
+          throw e;
         }
       };
 
@@ -295,7 +291,7 @@ public class RetryingBlockFetcherSuite {
     }
 
     assertNotNull(stub);
-    stub.when(fetchStarter).createAndStart((String[]) any(), 
(BlockFetchingListener) anyObject());
+    stub.when(fetchStarter).createAndStart(any(), anyObject());
     String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
     new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, 
listener).start();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 189d607..29aca04 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.UnsafeAlignedOffset;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.TaskCompletionListener;
 import org.apache.spark.util.Utils;
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java 
b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
index 7fe452a..a6589d2 100644
--- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
+++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
@@ -20,14 +20,11 @@ import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.rdd.JdbcRDD;
 import org.junit.After;
 import org.junit.Assert;
@@ -89,30 +86,13 @@ public class JavaJdbcRDDSuite implements Serializable {
   public void testJavaJdbcRDD() throws Exception {
     JavaRDD<Integer> rdd = JdbcRDD.create(
       sc,
-      new JdbcRDD.ConnectionFactory() {
-        @Override
-        public Connection getConnection() throws SQLException {
-          return 
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
-        }
-      },
+      () -> 
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"),
       "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
       1, 100, 1,
-      new Function<ResultSet, Integer>() {
-        @Override
-        public Integer call(ResultSet r) throws Exception {
-          return r.getInt(1);
-        }
-      }
+      r -> r.getInt(1)
     ).cache();
 
     Assert.assertEquals(100, rdd.count());
-    Assert.assertEquals(
-      Integer.valueOf(10100),
-      rdd.reduce(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      }));
+    Assert.assertEquals(Integer.valueOf(10100), rdd.reduce((i1, i2) -> i1 + 
i2));
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 088b681..24a55df 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -34,8 +34,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.ShuffleDependency;
@@ -119,9 +117,7 @@ public class UnsafeShuffleWriterSuite {
       any(File.class),
       any(SerializerInstance.class),
       anyInt(),
-      any(ShuffleWriteMetrics.class))).thenAnswer(new 
Answer<DiskBlockObjectWriter>() {
-      @Override
-      public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+      any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
         Object[] args = invocationOnMock.getArguments();
         return new DiskBlockObjectWriter(
           (File) args[1],
@@ -132,33 +128,24 @@ public class UnsafeShuffleWriterSuite {
           (ShuffleWriteMetrics) args[4],
           (BlockId) args[0]
         );
-      }
-    });
+      });
 
     when(shuffleBlockResolver.getDataFile(anyInt(), 
anyInt())).thenReturn(mergedOutputFile);
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-        partitionSizesInMergedFile = (long[]) 
invocationOnMock.getArguments()[2];
-        File tmp = (File) invocationOnMock.getArguments()[3];
-        mergedOutputFile.delete();
-        tmp.renameTo(mergedOutputFile);
-        return null;
-      }
+    doAnswer(invocationOnMock -> {
+      partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
+      File tmp = (File) invocationOnMock.getArguments()[3];
+      mergedOutputFile.delete();
+      tmp.renameTo(mergedOutputFile);
+      return null;
     }).when(shuffleBlockResolver)
       .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), 
any(File.class));
 
-    when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
-      new Answer<Tuple2<TempShuffleBlockId, File>>() {
-        @Override
-        public Tuple2<TempShuffleBlockId, File> answer(
-          InvocationOnMock invocationOnMock) throws Throwable {
-          TempShuffleBlockId blockId = new 
TempShuffleBlockId(UUID.randomUUID());
-          File file = File.createTempFile("spillFile", ".spill", tempDir);
-          spillFilesCreated.add(file);
-          return Tuple2$.MODULE$.apply(blockId, file);
-        }
-      });
+    
when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> {
+      TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
+      File file = File.createTempFile("spillFile", ".spill", tempDir);
+      spillFilesCreated.add(file);
+      return Tuple2$.MODULE$.apply(blockId, file);
+    });
 
     when(taskContext.taskMetrics()).thenReturn(taskMetrics);
     when(shuffleDep.serializer()).thenReturn(serializer);
@@ -243,7 +230,7 @@ public class UnsafeShuffleWriterSuite {
   @Test
   public void writeEmptyIterator() throws Exception {
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
-    writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
+    writer.write(Iterators.emptyIterator());
     final Option<MapStatus> mapStatus = writer.stop(true);
     assertTrue(mapStatus.isDefined());
     assertTrue(mergedOutputFile.exists());
@@ -259,7 +246,7 @@ public class UnsafeShuffleWriterSuite {
     // In this example, each partition should have exactly one record:
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
     for (int i = 0; i < NUM_PARTITITONS; i++) {
-      dataToWrite.add(new Tuple2<Object, Object>(i, i));
+      dataToWrite.add(new Tuple2<>(i, i));
     }
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
     writer.write(dataToWrite.iterator());
@@ -315,7 +302,7 @@ public class UnsafeShuffleWriterSuite {
     final UnsafeShuffleWriter<Object, Object> writer = 
createWriter(transferToEnabled);
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
     for (int i : new int[] { 1, 2, 3, 4, 4, 2 }) {
-      dataToWrite.add(new Tuple2<Object, Object>(i, i));
+      dataToWrite.add(new Tuple2<>(i, i));
     }
     writer.insertRecordIntoSorter(dataToWrite.get(0));
     writer.insertRecordIntoSorter(dataToWrite.get(1));
@@ -424,7 +411,7 @@ public class UnsafeShuffleWriterSuite {
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
     final byte[] bigByteArray = new 
byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10];
     for (int i = 0; i < 10 + 1; i++) {
-      dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
+      dataToWrite.add(new Tuple2<>(i, bigByteArray));
     }
     writer.write(dataToWrite.iterator());
     assertEquals(2, spillFilesCreated.size());
@@ -458,7 +445,7 @@ public class UnsafeShuffleWriterSuite {
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
     for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 
1; i++) {
-      dataToWrite.add(new Tuple2<Object, Object>(i, i));
+      dataToWrite.add(new Tuple2<>(i, i));
     }
     writer.write(dataToWrite.iterator());
     writer.stop(true);
@@ -478,7 +465,7 @@ public class UnsafeShuffleWriterSuite {
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
     final byte[] bytes = new byte[(int) 
(ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)];
     new Random(42).nextBytes(bytes);
-    dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(bytes)));
+    dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(bytes)));
     writer.write(dataToWrite.iterator());
     writer.stop(true);
     assertEquals(
@@ -491,15 +478,15 @@ public class UnsafeShuffleWriterSuite {
   public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception {
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
     final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
-    dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(new 
byte[1])));
+    dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(new byte[1])));
     // We should be able to write a record that's right _at_ the max record 
size
     final byte[] atMaxRecordSize = new byte[(int) 
taskMemoryManager.pageSizeBytes() - 4];
     new Random(42).nextBytes(atMaxRecordSize);
-    dataToWrite.add(new Tuple2<Object, Object>(2, 
ByteBuffer.wrap(atMaxRecordSize)));
+    dataToWrite.add(new Tuple2<>(2, ByteBuffer.wrap(atMaxRecordSize)));
     // Inserting a record that's larger than the max record size
     final byte[] exceedsMaxRecordSize = new byte[(int) 
taskMemoryManager.pageSizeBytes()];
     new Random(42).nextBytes(exceedsMaxRecordSize);
-    dataToWrite.add(new Tuple2<Object, Object>(3, 
ByteBuffer.wrap(exceedsMaxRecordSize)));
+    dataToWrite.add(new Tuple2<>(3, ByteBuffer.wrap(exceedsMaxRecordSize)));
     writer.write(dataToWrite.iterator());
     writer.stop(true);
     assertEquals(
@@ -511,10 +498,10 @@ public class UnsafeShuffleWriterSuite {
   @Test
   public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException {
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
-    writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
-    writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
+    writer.insertRecordIntoSorter(new Tuple2<>(1, 1));
+    writer.insertRecordIntoSorter(new Tuple2<>(2, 2));
     writer.forceSorterToSpill();
-    writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
+    writer.insertRecordIntoSorter(new Tuple2<>(2, 2));
     writer.stop(false);
     assertSpillFilesWereCleanedUp();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 2656814..03cec8e 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import scala.Tuple2;
 import scala.Tuple2$;
 
 import org.junit.After;
@@ -31,8 +30,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -88,25 +85,18 @@ public abstract class AbstractBytesToBytesMapSuite {
     spillFilesCreated.clear();
     MockitoAnnotations.initMocks(this);
     when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
-    when(diskBlockManager.createTempLocalBlock()).thenAnswer(
-        new Answer<Tuple2<TempLocalBlockId, File>>() {
-      @Override
-      public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock 
invocationOnMock)
-          throws Throwable {
-        TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
-        File file = File.createTempFile("spillFile", ".spill", tempDir);
-        spillFilesCreated.add(file);
-        return Tuple2$.MODULE$.apply(blockId, file);
-      }
+    when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock 
-> {
+      TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
+      File file = File.createTempFile("spillFile", ".spill", tempDir);
+      spillFilesCreated.add(file);
+      return Tuple2$.MODULE$.apply(blockId, file);
     });
     when(blockManager.getDiskWriter(
       any(BlockId.class),
       any(File.class),
       any(SerializerInstance.class),
       anyInt(),
-      any(ShuffleWriteMetrics.class))).thenAnswer(new 
Answer<DiskBlockObjectWriter>() {
-      @Override
-      public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+      any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
         Object[] args = invocationOnMock.getArguments();
 
         return new DiskBlockObjectWriter(
@@ -118,8 +108,7 @@ public abstract class AbstractBytesToBytesMapSuite {
           (ShuffleWriteMetrics) args[4],
           (BlockId) args[0]
         );
-      }
-    });
+      });
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index fbbe530..771d390 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.UUID;
 
-import scala.Tuple2;
 import scala.Tuple2$;
 
 import org.junit.After;
@@ -31,8 +30,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
@@ -96,25 +93,18 @@ public class UnsafeExternalSorterSuite {
     taskContext = mock(TaskContext.class);
     when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
     when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
-    when(diskBlockManager.createTempLocalBlock()).thenAnswer(
-        new Answer<Tuple2<TempLocalBlockId, File>>() {
-      @Override
-      public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock 
invocationOnMock)
-          throws Throwable {
-        TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
-        File file = File.createTempFile("spillFile", ".spill", tempDir);
-        spillFilesCreated.add(file);
-        return Tuple2$.MODULE$.apply(blockId, file);
-      }
+    when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock 
-> {
+      TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
+      File file = File.createTempFile("spillFile", ".spill", tempDir);
+      spillFilesCreated.add(file);
+      return Tuple2$.MODULE$.apply(blockId, file);
     });
     when(blockManager.getDiskWriter(
       any(BlockId.class),
       any(File.class),
       any(SerializerInstance.class),
       anyInt(),
-      any(ShuffleWriteMetrics.class))).thenAnswer(new 
Answer<DiskBlockObjectWriter>() {
-      @Override
-      public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+      any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
         Object[] args = invocationOnMock.getArguments();
 
         return new DiskBlockObjectWriter(
@@ -126,8 +116,7 @@ public class UnsafeExternalSorterSuite {
           (ShuffleWriteMetrics) args[4],
           (BlockId) args[0]
         );
-      }
-    });
+      });
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java 
b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
index e22ad89..1d2b05e 100644
--- a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
@@ -64,12 +64,7 @@ public class Java8RDDAPISuite implements Serializable {
   public void foreachWithAnonymousClass() {
     foreachCalls = 0;
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
-    rdd.foreach(new VoidFunction<String>() {
-      @Override
-      public void call(String s) {
-        foreachCalls++;
-      }
-    });
+    rdd.foreach(s -> foreachCalls++);
     Assert.assertEquals(2, foreachCalls);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to