ACCUMULO-4317 Refactor TTimeoutTransport to be able to better test it
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7bd9c088 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7bd9c088 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7bd9c088 Branch: refs/heads/master Commit: 7bd9c0882efec7d42b83870b3cc3436471359472 Parents: bfc2a5b Author: Josh Elser <els...@apache.org> Authored: Mon Jun 20 22:44:55 2016 -0400 Committer: Josh Elser <els...@apache.org> Committed: Mon Jun 20 22:44:55 2016 -0400 ---------------------------------------------------------------------- .../accumulo/core/rpc/TTimeoutTransport.java | 146 ++++++++++++++++-- .../core/rpc/TTimeoutTransportTest.java | 151 +++++++++++++++++++ 2 files changed, 286 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java index cc3f51b..809975f 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java @@ -30,16 +30,31 @@ import java.nio.channels.spi.SelectorProvider; import org.apache.hadoop.net.NetUtils; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; +/** + * A utility class for setting up a {@link TTransport} with various necessary configurations for ideal performance in Accumulo. These configurations include: + * <ul> + * <li>Setting SO_LINGER=false on the socket.</li> + * <li>Setting TCP_NO_DELAY=true on the socket.</li> + * <li>Setting timeouts on the I/OStreams.</li> + * </ul> + */ public class TTimeoutTransport { + private static final Logger log = LoggerFactory.getLogger(TTimeoutTransport.class); + + private static final TTimeoutTransport INSTANCE = new TTimeoutTransport(); + + private volatile Method GET_INPUT_STREAM_METHOD = null; - private static volatile Method GET_INPUT_STREAM_METHOD = null; + private TTimeoutTransport() {} - private static Method getNetUtilsInputStreamMethod() { + private Method getNetUtilsInputStreamMethod() { if (null == GET_INPUT_STREAM_METHOD) { - synchronized (TTimeoutTransport.class) { + synchronized (this) { if (null == GET_INPUT_STREAM_METHOD) { try { GET_INPUT_STREAM_METHOD = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE); @@ -53,35 +68,144 @@ public class TTimeoutTransport { return GET_INPUT_STREAM_METHOD; } - private static InputStream getInputStream(Socket socket, long timeout) { + /** + * Invokes the <code>NetUtils.getInputStream(Socket, long)</code> using reflection to handle compatibility with both Hadoop 1 and 2. + * + * @param socket + * The socket to create the input stream on + * @param timeout + * The timeout for the input stream in milliseconds + * @return An InputStream on the socket + */ + private InputStream getInputStream(Socket socket, long timeout) throws IOException { try { return (InputStream) getNetUtilsInputStreamMethod().invoke(null, socket, timeout); } catch (Exception e) { - throw new RuntimeException(e); + Throwable cause = e.getCause(); + // Try to re-throw the IOException directly + if (cause instanceof IOException) { + throw (IOException) cause; + } + + if (e instanceof RuntimeException) { + // Don't re-wrap another RTE around an RTE + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } } } + /** + * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown. + * + * @param addr + * The address to connect the client to + * @param timeoutMillis + * The timeout in milliseconds for the connection + * @return A TTransport connected to the given <code>addr</code> + * @throws IOException + * If the transport fails to be created/connected + */ public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException { - return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis); + return INSTANCE.createInternal(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis); } + /** + * Creates a Thrift TTransport to the given address with the given timeout. All created resources are closed if an exception is thrown. + * + * @param addr + * The address to connect the client to + * @param timeoutMillis + * The timeout in milliseconds for the connection + * @return A TTransport connected to the given <code>addr</code> + * @throws IOException + * If the transport fails to be created/connected + */ public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { + return INSTANCE.createInternal(addr, timeoutMillis); + } + + /** + * Opens a socket to the given <code>addr</code>, configures the socket, and then creates a Thrift transport using the socket. + * + * @param addr + * The address the socket should connect + * @param timeoutMillis + * The socket timeout in milliseconds + * @return A TTransport instance to the given <code>addr</code> + * @throws IOException + * If the Thrift client is failed to be connected/created + */ + protected TTransport createInternal(SocketAddress addr, long timeoutMillis) throws IOException { Socket socket = null; try { - socket = SelectorProvider.provider().openSocketChannel().socket(); + socket = openSocket(addr); + } catch (IOException e) { + // openSocket handles closing the Socket on error + throw e; + } + + // Should be non-null + assert null != socket; + + // Set up the streams + try { + InputStream input = wrapInputStream(socket, timeoutMillis); + OutputStream output = wrapOutputStream(socket, timeoutMillis); + return new TIOStreamTransport(input, output); + } catch (IOException e) { + try { + socket.close(); + } catch (IOException ioe) { + log.error("Failed to close socket after unsuccessful I/O stream setup", e); + } + + throw e; + } + } + + // Visible for testing + protected InputStream wrapInputStream(Socket socket, long timeoutMillis) throws IOException { + return new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); + } + + // Visible for testing + protected OutputStream wrapOutputStream(Socket socket, long timeoutMillis) throws IOException { + return new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); + } + + /** + * Opens and configures a {@link Socket} for Accumulo RPC. + * + * @param addr + * The address to connect the socket to + * @return A socket connected to the given address, or null if the socket fails to connect + */ + protected Socket openSocket(SocketAddress addr) throws IOException { + Socket socket = null; + try { + socket = openSocketChannel(); socket.setSoLinger(false, 0); socket.setTcpNoDelay(true); socket.connect(addr); - InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10); - OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10); - return new TIOStreamTransport(input, output); + return socket; } catch (IOException e) { try { if (socket != null) socket.close(); - } catch (IOException ioe) {} + } catch (IOException ioe) { + log.error("Failed to close socket after unsuccessful open.", e); + } throw e; } } + + /** + * Opens a socket channel and returns the underlying socket. + */ + protected Socket openSocketChannel() throws IOException { + return SelectorProvider.provider().openSocketChannel().socket(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bd9c088/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java new file mode 100644 index 0000000..cedac9c --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/rpc/TTimeoutTransportTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.rpc; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.net.SocketAddress; + +import org.junit.Test; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createMockBuilder; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.fail; + +/** + * Tests for {@link TTimeoutTransport}. + */ +public class TTimeoutTransportTest { + + void expectedSocketSetup(Socket s) throws IOException { + s.setSoLinger(false, 0); + expectLastCall().once(); + s.setTcpNoDelay(true); + expectLastCall().once(); + } + + @Test + public void testFailedSocketOpenIsClosed() throws IOException { + SocketAddress addr = createMock(SocketAddress.class); + Socket s = createMock(Socket.class); + TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").createMock(); + + // Return out mocked socket + expect(timeoutTransport.openSocketChannel()).andReturn(s).once(); + + // tcpnodelay and solinger + expectedSocketSetup(s); + + // Connect to the addr + s.connect(addr); + expectLastCall().andThrow(new IOException()); + + // The socket should be closed after the above IOException + s.close(); + + replay(addr, s, timeoutTransport); + + try { + timeoutTransport.openSocket(addr); + fail("Expected to catch IOException but got none"); + } catch (IOException e) { + // Expected + } + + verify(addr, s, timeoutTransport); + } + + @Test + public void testFailedInputStreamClosesSocket() throws IOException { + long timeout = 2 * 60 * 1000; // 2 mins + SocketAddress addr = createMock(SocketAddress.class); + Socket s = createMock(Socket.class); + TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream") + .createMock(); + + // Return out mocked socket + expect(timeoutTransport.openSocketChannel()).andReturn(s).once(); + + // tcpnodelay and solinger + expectedSocketSetup(s); + + // Connect to the addr + s.connect(addr); + expectLastCall().once(); + + expect(timeoutTransport.wrapInputStream(s, timeout)).andThrow(new IOException()); + + // The socket should be closed after the above IOException + s.close(); + + replay(addr, s, timeoutTransport); + + try { + timeoutTransport.createInternal(addr, timeout); + fail("Expected to catch IOException but got none"); + } catch (IOException e) { + // Expected + } + + verify(addr, s, timeoutTransport); + } + + @Test + public void testFailedOutputStreamClosesSocket() throws IOException { + long timeout = 2 * 60 * 1000; // 2 mins + SocketAddress addr = createMock(SocketAddress.class); + Socket s = createMock(Socket.class); + InputStream is = createMock(InputStream.class); + TTimeoutTransport timeoutTransport = createMockBuilder(TTimeoutTransport.class).addMockedMethod("openSocketChannel").addMockedMethod("wrapInputStream") + .addMockedMethod("wrapOutputStream").createMock(); + + // Return out mocked socket + expect(timeoutTransport.openSocketChannel()).andReturn(s).once(); + + // tcpnodelay and solinger + expectedSocketSetup(s); + + // Connect to the addr + s.connect(addr); + expectLastCall().once(); + + // Input stream is set up + expect(timeoutTransport.wrapInputStream(s, timeout)).andReturn(is); + // Output stream fails to be set up + expect(timeoutTransport.wrapOutputStream(s, timeout)).andThrow(new IOException()); + + // The socket should be closed after the above IOException + s.close(); + + replay(addr, s, timeoutTransport); + + try { + timeoutTransport.createInternal(addr, timeout); + fail("Expected to catch IOException but got none"); + } catch (IOException e) { + // Expected + } + + verify(addr, s, timeoutTransport); + } + +}