This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit c87c681ce9ab09c5bc30859b812e3d3ef3284f56 Author: Thomas Vandahl <[email protected]> AuthorDate: Fri Mar 26 22:54:21 2021 +0100 Use NIO channels --- .../lateral/socket/tcp/LateralTCPSender.java | 77 ++++-------- .../jcs3/engine/behavior/IElementSerializer.java | 133 +++++++++++++++++++-- 2 files changed, 147 insertions(+), 63 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java index b73bf1f..64a319e 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java @@ -20,10 +20,12 @@ package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; */ import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.Socket; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,8 +52,8 @@ public class LateralTCPSender /** The serializer. */ private IElementSerializer serializer; - /** The socket connection with the server. */ - private Socket socket; + /** The client connection with the server. */ + private AsynchronousSocketChannel client; /** how many messages sent */ private int sendCnt; @@ -108,32 +110,22 @@ public class LateralTCPSender protected void init( final String host, final int port ) throws IOException { + log.info( "Attempting connection to [{0}:{1}]", host, port ); + try { - log.info( "Attempting connection to [{0}]", host ); - - try - { - this.socket = new Socket(); - this.socket.connect(new InetSocketAddress(host, port), this.socketOpenTimeOut); - } - catch ( final IOException ioe ) - { - throw new IOException( "Cannot connect to " + host + ":" + port, ioe ); - } - - socket.setSoTimeout( socketSoTimeOut ); - } - catch ( final java.net.ConnectException e ) - { - log.debug( "Remote host [{0}] refused connection.", host ); - throw e; + client = AsynchronousSocketChannel.open(); + InetSocketAddress hostAddress = new InetSocketAddress(host, port); + Future<Void> future = client.connect(hostAddress); + + future.get(this.socketOpenTimeOut, TimeUnit.MILLISECONDS); } - catch ( final IOException e ) + catch (final IOException | InterruptedException | ExecutionException | TimeoutException ioe) { - log.debug( "Could not connect to [{0}]", host, e ); - throw e; + throw new IOException( "Cannot connect to " + host + ":" + port, ioe ); } + + // socket.setSoTimeout( socketSoTimeOut ); } /** @@ -148,7 +140,7 @@ public class LateralTCPSender sendCnt++; if ( log.isInfoEnabled() && sendCnt % 100 == 0 ) { - log.info( "Send Count (port {0}) = {1}", socket.getPort(), sendCnt ); + log.info( "Send Count {0} = {1}", client.getRemoteAddress(), sendCnt ); } log.debug( "sending LateralElementDescriptor" ); @@ -158,13 +150,10 @@ public class LateralTCPSender return; } - final OutputStream sos = socket.getOutputStream(); - lock.lock(); try { - serializer.serializeTo(led, sos); - sos.flush(); + serializer.serializeTo(led, client, socketSoTimeOut); } finally { @@ -201,31 +190,15 @@ public class LateralTCPSender lock.lock(); try { - try - { - // clean up input stream, nothing should be there yet. - if ( socket.getInputStream().available() > 0 ) - { - socket.getInputStream().read( new byte[socket.getInputStream().available()] ); - } - } - catch ( final IOException ioe ) - { - log.error( "Problem cleaning socket before send {0}", socket, ioe ); - throw ioe; - } - // write object to listener send(led); - - InputStream sis = socket.getInputStream(); - response = serializer.deSerializeFrom(sis, null); + response = serializer.deSerializeFrom(client, socketSoTimeOut, null); } catch ( final IOException | ClassNotFoundException ioe ) { - final String message = "Could not open InputStream to " + socket + - " SoTimeout [" + socket.getSoTimeout() + - "] Connected [" + socket.isConnected() + "]"; + final String message = "Could not open InputStream to " + + client.getRemoteAddress() + " SoTimeout [" + socketSoTimeOut + + "] Connected [" + client.isOpen() + "]"; log.error( message, ioe ); throw new IOException(message, ioe); } @@ -248,6 +221,6 @@ public class LateralTCPSender throws IOException { log.info( "Dispose called" ); - socket.close(); + client.close(); } } diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java index ce72212..56fc2f5 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java @@ -24,8 +24,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousByteChannel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Defines the behavior for cache element serializers. This layer of abstraction allows us to plug @@ -59,8 +64,8 @@ public interface IElementSerializer throws IOException, ClassNotFoundException; /** - * Convenience method to write serialized object into a stream - * The stream data will be prepended with a four-byte length prefix + * Convenience method to write serialized object into a stream. + * The stream data will be prepended with a four-byte length prefix. * * @param <T> the type of the object * @param obj the object to serialize @@ -82,8 +87,8 @@ public interface IElementSerializer } /** - * Convenience method to write serialized object into a channel - * The stream data will be prepended with a four-byte length prefix + * Convenience method to write serialized object into a channel. + * The stream data will be prepended with a four-byte length prefix. * * @param <T> the type of the object * @param obj the object to serialize @@ -100,12 +105,54 @@ public interface IElementSerializer buffer.put(serialized); buffer.flip(); - oc.write(buffer); - return buffer.capacity(); + int count = 0; + while (buffer.hasRemaining()) + { + count += oc.write(buffer); + } + return count; } /** - * Convenience method to read serialized object from a stream + * Convenience method to write serialized object into an + * asynchronous channel. + * The stream data will be prepended with a four-byte length prefix. + * + * @param <T> the type of the object + * @param obj the object to serialize + * @param oc the output channel + * @param writeTimeoutMs the write timeout im milliseconds + * @return the number of bytes written + * @throws IOException if serialization or writing fails + */ + default <T> int serializeTo(T obj, AsynchronousByteChannel oc, int writeTimeoutMs) + throws IOException + { + final byte[] serialized = serialize(obj); + final ByteBuffer buffer = ByteBuffer.allocate(4 + serialized.length); + buffer.putInt(serialized.length); + buffer.put(serialized); + buffer.flip(); + + int count = 0; + while (buffer.hasRemaining()) + { + Future<Integer> bytesWritten = oc.write(buffer); + try + { + count += bytesWritten.get(writeTimeoutMs, TimeUnit.MILLISECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException e) + { + throw new IOException("Write timeout exceeded " + writeTimeoutMs, e); + } + } + + return count; + } + + /** + * Convenience method to read serialized object from a stream. * The method expects to find a four-byte length prefix in the * stream data. * @@ -135,7 +182,7 @@ public interface IElementSerializer } /** - * Convenience method to read serialized object from a channel + * Convenience method to read serialized object from a channel. * The method expects to find a four-byte length prefix in the * stream data. * @@ -152,14 +199,78 @@ public interface IElementSerializer int read = ic.read(bufferSize); if (read < 0) { - throw new EOFException("End of stream reached"); + throw new EOFException("End of stream reached (length)"); } assert read == bufferSize.capacity(); bufferSize.flip(); final ByteBuffer serialized = ByteBuffer.allocate(bufferSize.getInt()); - read = ic.read(serialized); - assert read == serialized.capacity(); + while (serialized.remaining() > 0) + { + read = ic.read(serialized); + if (read < 0) + { + throw new EOFException("End of stream reached (object)"); + } + } + serialized.flip(); + + return deSerialize(serialized.array(), loader); + } + + /** + * Convenience method to read serialized object from an + * asynchronous channel. + * The method expects to find a four-byte length prefix in the + * stream data. + * + * @param <T> the type of the object + * @param ic the input channel + * @param readTimeoutMs the read timeout in milliseconds + * @param loader class loader to use + * @throws IOException if serialization or reading fails + * @throws ClassNotFoundException thrown if we don't know the object. + */ + default <T> T deSerializeFrom(AsynchronousByteChannel ic, int readTimeoutMs, ClassLoader loader) + throws IOException, ClassNotFoundException + { + final ByteBuffer bufferSize = ByteBuffer.allocate(4); + Future<Integer> readFuture = ic.read(bufferSize); + + try + { + int read = readFuture.get(readTimeoutMs, TimeUnit.MILLISECONDS); + if (read < 0) + { + throw new EOFException("End of stream reached (length)"); + } + assert read == bufferSize.capacity(); + } + catch (InterruptedException | ExecutionException | TimeoutException e) + { + throw new IOException("Read timeout exceeded (length)" + readTimeoutMs, e); + } + + bufferSize.flip(); + + final ByteBuffer serialized = ByteBuffer.allocate(bufferSize.getInt()); + while (serialized.remaining() > 0) + { + readFuture = ic.read(serialized); + try + { + int read = readFuture.get(readTimeoutMs, TimeUnit.MILLISECONDS); + if (read < 0) + { + throw new EOFException("End of stream reached (object)"); + } + } + catch (InterruptedException | ExecutionException | TimeoutException e) + { + throw new IOException("Read timeout exceeded (object)" + readTimeoutMs, e); + } + } + serialized.flip(); return deSerialize(serialized.array(), loader);
