This is an automated email from the ASF dual-hosted git repository.

tv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-jcs.git

commit c87c681ce9ab09c5bc30859b812e3d3ef3284f56
Author: Thomas Vandahl <[email protected]>
AuthorDate: Fri Mar 26 22:54:21 2021 +0100

    Use NIO channels
---
 .../lateral/socket/tcp/LateralTCPSender.java       |  77 ++++--------
 .../jcs3/engine/behavior/IElementSerializer.java   | 133 +++++++++++++++++++--
 2 files changed, 147 insertions(+), 63 deletions(-)

diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
index b73bf1f..64a319e 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java
@@ -20,10 +20,12 @@ package 
org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
  */
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -50,8 +52,8 @@ public class LateralTCPSender
     /** The serializer. */
     private IElementSerializer serializer;
 
-    /** The socket connection with the server. */
-    private Socket socket;
+    /** The client connection with the server. */
+    private AsynchronousSocketChannel client;
 
     /** how many messages sent */
     private int sendCnt;
@@ -108,32 +110,22 @@ public class LateralTCPSender
     protected void init( final String host, final int port )
         throws IOException
     {
+        log.info( "Attempting connection to [{0}:{1}]", host, port );
+
         try
         {
-            log.info( "Attempting connection to [{0}]", host );
-
-            try
-            {
-                this.socket = new Socket();
-                this.socket.connect(new InetSocketAddress(host, port), 
this.socketOpenTimeOut);
-            }
-            catch ( final IOException ioe )
-            {
-                throw new IOException( "Cannot connect to " + host + ":" + 
port, ioe );
-            }
-
-            socket.setSoTimeout( socketSoTimeOut );
-        }
-        catch ( final java.net.ConnectException e )
-        {
-            log.debug( "Remote host [{0}] refused connection.", host );
-            throw e;
+            client = AsynchronousSocketChannel.open();
+            InetSocketAddress hostAddress = new InetSocketAddress(host, port);
+            Future<Void> future = client.connect(hostAddress);
+
+            future.get(this.socketOpenTimeOut, TimeUnit.MILLISECONDS);
         }
-        catch ( final IOException e )
+        catch (final IOException | InterruptedException | ExecutionException | 
TimeoutException ioe)
         {
-            log.debug( "Could not connect to [{0}]", host, e );
-            throw e;
+            throw new IOException( "Cannot connect to " + host + ":" + port, 
ioe );
         }
+
+        // socket.setSoTimeout( socketSoTimeOut );
     }
 
     /**
@@ -148,7 +140,7 @@ public class LateralTCPSender
         sendCnt++;
         if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
         {
-            log.info( "Send Count (port {0}) = {1}", socket.getPort(), sendCnt 
);
+            log.info( "Send Count {0} = {1}", client.getRemoteAddress(), 
sendCnt );
         }
 
         log.debug( "sending LateralElementDescriptor" );
@@ -158,13 +150,10 @@ public class LateralTCPSender
             return;
         }
 
-        final OutputStream sos = socket.getOutputStream();
-
         lock.lock();
         try
         {
-            serializer.serializeTo(led, sos);
-            sos.flush();
+            serializer.serializeTo(led, client, socketSoTimeOut);
         }
         finally
         {
@@ -201,31 +190,15 @@ public class LateralTCPSender
         lock.lock();
         try
         {
-            try
-            {
-                // clean up input stream, nothing should be there yet.
-                if ( socket.getInputStream().available() > 0 )
-                {
-                    socket.getInputStream().read( new 
byte[socket.getInputStream().available()] );
-                }
-            }
-            catch ( final IOException ioe )
-            {
-                log.error( "Problem cleaning socket before send {0}", socket, 
ioe );
-                throw ioe;
-            }
-
             // write object to listener
             send(led);
-
-            InputStream sis = socket.getInputStream();
-            response = serializer.deSerializeFrom(sis, null);
+            response = serializer.deSerializeFrom(client, socketSoTimeOut, 
null);
         }
         catch ( final IOException | ClassNotFoundException ioe )
         {
-            final String message = "Could not open InputStream to " + socket +
-                " SoTimeout [" + socket.getSoTimeout() +
-                "] Connected [" + socket.isConnected() + "]";
+            final String message = "Could not open InputStream to " +
+                client.getRemoteAddress() + " SoTimeout [" + socketSoTimeOut +
+                "] Connected [" + client.isOpen() + "]";
             log.error( message, ioe );
             throw new IOException(message, ioe);
         }
@@ -248,6 +221,6 @@ public class LateralTCPSender
         throws IOException
     {
         log.info( "Dispose called" );
-        socket.close();
+        client.close();
     }
 }
diff --git 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java
 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java
index ce72212..56fc2f5 100644
--- 
a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java
+++ 
b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/engine/behavior/IElementSerializer.java
@@ -24,8 +24,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousByteChannel;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Defines the behavior for cache element serializers. This layer of 
abstraction allows us to plug
@@ -59,8 +64,8 @@ public interface IElementSerializer
         throws IOException, ClassNotFoundException;
 
     /**
-     * Convenience method to write serialized object into a stream
-     * The stream data will be prepended with a four-byte length prefix
+     * Convenience method to write serialized object into a stream.
+     * The stream data will be prepended with a four-byte length prefix.
      *
      * @param <T> the type of the object
      * @param obj the object to serialize
@@ -82,8 +87,8 @@ public interface IElementSerializer
     }
 
     /**
-     * Convenience method to write serialized object into a channel
-     * The stream data will be prepended with a four-byte length prefix
+     * Convenience method to write serialized object into a channel.
+     * The stream data will be prepended with a four-byte length prefix.
      *
      * @param <T> the type of the object
      * @param obj the object to serialize
@@ -100,12 +105,54 @@ public interface IElementSerializer
         buffer.put(serialized);
         buffer.flip();
 
-        oc.write(buffer);
-        return buffer.capacity();
+        int count = 0;
+        while (buffer.hasRemaining())
+        {
+            count += oc.write(buffer);
+        }
+        return count;
     }
 
     /**
-     * Convenience method to read serialized object from a stream
+     * Convenience method to write serialized object into an
+     * asynchronous channel.
+     * The stream data will be prepended with a four-byte length prefix.
+     *
+     * @param <T> the type of the object
+     * @param obj the object to serialize
+     * @param oc the output channel
+     * @param writeTimeoutMs the write timeout im milliseconds
+     * @return the number of bytes written
+     * @throws IOException if serialization or writing fails
+     */
+    default <T> int serializeTo(T obj, AsynchronousByteChannel oc, int 
writeTimeoutMs)
+        throws IOException
+    {
+        final byte[] serialized = serialize(obj);
+        final ByteBuffer buffer = ByteBuffer.allocate(4 + serialized.length);
+        buffer.putInt(serialized.length);
+        buffer.put(serialized);
+        buffer.flip();
+
+        int count = 0;
+        while (buffer.hasRemaining())
+        {
+            Future<Integer> bytesWritten = oc.write(buffer);
+            try
+            {
+                count += bytesWritten.get(writeTimeoutMs, 
TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException | ExecutionException | 
TimeoutException e)
+            {
+                throw new IOException("Write timeout exceeded " + 
writeTimeoutMs, e);
+            }
+        }
+
+        return count;
+    }
+
+    /**
+     * Convenience method to read serialized object from a stream.
      * The method expects to find a four-byte length prefix in the
      * stream data.
      *
@@ -135,7 +182,7 @@ public interface IElementSerializer
     }
 
     /**
-     * Convenience method to read serialized object from a channel
+     * Convenience method to read serialized object from a channel.
      * The method expects to find a four-byte length prefix in the
      * stream data.
      *
@@ -152,14 +199,78 @@ public interface IElementSerializer
         int read = ic.read(bufferSize);
         if (read < 0)
         {
-            throw new EOFException("End of stream reached");
+            throw new EOFException("End of stream reached (length)");
         }
         assert read == bufferSize.capacity();
         bufferSize.flip();
 
         final ByteBuffer serialized = ByteBuffer.allocate(bufferSize.getInt());
-        read = ic.read(serialized);
-        assert read == serialized.capacity();
+        while (serialized.remaining() > 0)
+        {
+            read = ic.read(serialized);
+            if (read < 0)
+            {
+                throw new EOFException("End of stream reached (object)");
+            }
+        }
+        serialized.flip();
+
+        return deSerialize(serialized.array(), loader);
+    }
+
+    /**
+     * Convenience method to read serialized object from an
+     * asynchronous channel.
+     * The method expects to find a four-byte length prefix in the
+     * stream data.
+     *
+     * @param <T> the type of the object
+     * @param ic the input channel
+     * @param readTimeoutMs the read timeout in milliseconds
+     * @param loader class loader to use
+     * @throws IOException if serialization or reading fails
+     * @throws ClassNotFoundException thrown if we don't know the object.
+     */
+    default <T> T deSerializeFrom(AsynchronousByteChannel ic, int 
readTimeoutMs, ClassLoader loader)
+        throws IOException, ClassNotFoundException
+    {
+        final ByteBuffer bufferSize = ByteBuffer.allocate(4);
+        Future<Integer> readFuture = ic.read(bufferSize);
+
+        try
+        {
+            int read = readFuture.get(readTimeoutMs, TimeUnit.MILLISECONDS);
+            if (read < 0)
+            {
+                throw new EOFException("End of stream reached (length)");
+            }
+            assert read == bufferSize.capacity();
+        }
+        catch (InterruptedException | ExecutionException | TimeoutException e)
+        {
+            throw new IOException("Read timeout exceeded (length)" + 
readTimeoutMs, e);
+        }
+
+        bufferSize.flip();
+
+        final ByteBuffer serialized = ByteBuffer.allocate(bufferSize.getInt());
+        while (serialized.remaining() > 0)
+        {
+            readFuture = ic.read(serialized);
+            try
+            {
+                int read = readFuture.get(readTimeoutMs, 
TimeUnit.MILLISECONDS);
+                if (read < 0)
+                {
+                    throw new EOFException("End of stream reached (object)");
+                }
+            }
+            catch (InterruptedException | ExecutionException | 
TimeoutException e)
+            {
+                throw new IOException("Read timeout exceeded (object)" + 
readTimeoutMs, e);
+            }
+        }
+
         serialized.flip();
 
         return deSerialize(serialized.array(), loader);

Reply via email to