This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA1132
in repository https://gitbox.apache.org/repos/asf/mina.git


The following commit(s) were added to refs/heads/bugfix/DIRMINA1132 by this 
push:
     new 726cd81  Adds SSL2 flow control
726cd81 is described below

commit 726cd814ed82b5e272c678d772631022ea79ff7b
Author: Jonathan Valliere <john...@apache.org>
AuthorDate: Sat Jul 24 13:33:57 2021 -0400

    Adds SSL2 flow control
---
 .../mina/filter/ssl2/EncryptedWriteRequest.java    |  23 +---
 .../org/apache/mina/filter/ssl2/SSL2Filter.java    |  83 ++++++++-------
 .../org/apache/mina/filter/ssl2/SSL2Handler.java   |  20 +++-
 .../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 116 ++++++++++++++-------
 .../apache/mina/filter/ssl2/SSL2SimpleTest.java    |  21 ++--
 5 files changed, 157 insertions(+), 106 deletions(-)

diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
 
b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
index 91fabc7..caf32d7 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java
@@ -1,32 +1,19 @@
 package org.apache.mina.filter.ssl2;
 
-import org.apache.mina.core.future.WriteFuture;
 import org.apache.mina.core.write.DefaultWriteRequest;
 import org.apache.mina.core.write.WriteRequest;
 
 public class EncryptedWriteRequest extends DefaultWriteRequest {
 
        // The original message
-       private WriteRequest parentRequest;
+       private WriteRequest originalRequest;
 
        public EncryptedWriteRequest(Object encodedMessage, WriteRequest 
parent) {
-               super(encodedMessage, null);
+               super(encodedMessage, parent != null ? parent.getFuture() : 
null);
+               this.originalRequest = parent != null ? parent : this;
        }
 
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public boolean isEncoded() {
-               return true;
-       }
-
-       public WriteRequest getParentRequest() {
-               return this.parentRequest;
-       }
-
-       @Override
-       public WriteFuture getFuture() {
-               return (this.getParentRequest() != null) ? 
this.getParentRequest().getFuture() : super.getFuture();
+       public WriteRequest getOriginalRequest() {
+               return this.originalRequest;
        }
 }
\ No newline at end of file
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java 
b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
index 052f806..80e5688 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.filter.ssl2;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -38,16 +39,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An SSL filter that encrypts and decrypts the data exchanged in the session.
- * Adding this filter triggers SSL handshake procedure immediately by sending a
- * SSL 'hello' message, so you don't need to call {@link #startSsl(IoSession)}
- * manually unless you are implementing StartTLS (see below). If you don't want
- * the handshake procedure to start immediately, please specify {@code false} 
as
- * {@code autoStart} parameter in the constructor.
+ * An SSL Filter which simplifies and controls the flow of encrypted 
information
+ * on the filter-chain.
  * <p>
- * This filter uses an {@link SSLEngine} which was introduced in Java 5, so 
Java
- * version 5 or above is mandatory to use this filter. And please note that 
this
- * filter only works for TCP/IP connections.
+ * The initial handshake is automatically enabled for "client" sessions once 
the
+ * filter is added to the filter-chain and the session is connected.
  *
  * @author <a href="http://mina.apache.org";>Apache MINA Project</a>
  */
@@ -160,15 +156,6 @@ public class SSL2Filter extends IoFilterAdapter {
                this.mEnabledProtocols = protocols;
        }
 
-       /**
-        * Executed just before the filter is added into the chain, we do :
-        * <ul>
-        * <li>check that we don't have a SSL filter already present
-        * <li>we update the next filter
-        * <li>we create the SSL handler helper class
-        * <li>and we store it into the session's Attributes
-        * </ul>
-        */
        @Override
        public void onPreAdd(IoFilterChain parent, String name, NextFilter 
next) throws Exception {
                // Check that we don't have a SSL filter already present in the 
chain
@@ -184,34 +171,47 @@ public class SSL2Filter extends IoFilterAdapter {
        }
 
        @Override
-       public void onPreRemove(IoFilterChain parent, String name, NextFilter 
next) throws Exception {
+       public void onPostAdd(IoFilterChain parent, String name, NextFilter 
next) throws Exception {
                IoSession session = parent.getSession();
-               session.removeAttribute(SSL_HANDLER);
+               if (session.isConnected()) {
+                       this.sessionConnected(next, session);
+               }
+               super.onPostAdd(parent, name, next);
        }
 
        @Override
-       public void sessionOpened(NextFilter next, IoSession session) throws 
Exception {
-
-               LOGGER.debug("session openend {}", session);
+       public void onPreRemove(IoFilterChain parent, String name, NextFilter 
next) throws Exception {
+               IoSession session = parent.getSession();
+               SSL2Handler x = 
SSL2Handler.class.cast(session.removeAttribute(SSL_HANDLER));
+               if (x != null) {
+                       x.close(next);
+               }
+       }
 
+       protected void sessionConnected(NextFilter next, IoSession session) 
throws Exception {
                SSL2Handler x = 
SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
 
                if (x == null) {
-                       SSLEngine e = mContext.createSSLEngine();
-
+                       InetSocketAddress s = 
InetSocketAddress.class.cast(session.getRemoteAddress());
+                       SSLEngine e = 
mContext.createSSLEngine(s.getHostString(), s.getPort());
                        e.setNeedClientAuth(mNeedClientAuth);
                        e.setWantClientAuth(mWantClientAuth);
                        e.setEnabledCipherSuites(mEnabledCipherSuites);
                        e.setEnabledProtocols(mEnabledProtocols);
                        e.setUseClientMode(!session.isServer());
-
                        x = new SSL2HandlerG0(e, EXECUTOR, session);
-
                        session.setAttribute(SSL_HANDLER, x);
                }
 
                x.open(next);
-               
+       }
+
+       @Override
+       public void sessionOpened(NextFilter next, IoSession session) throws 
Exception {
+               if (LOGGER.isDebugEnabled())
+                       LOGGER.debug("session {} openend", session);
+
+               this.sessionConnected(next, session);
                super.sessionOpened(next, session);
        }
 
@@ -222,29 +222,32 @@ public class SSL2Filter extends IoFilterAdapter {
        }
 
        @Override
-       public void messageSent(NextFilter next, IoSession session, 
WriteRequest writeRequest) throws Exception {
-               if (writeRequest instanceof EncryptedWriteRequest) {
-                       EncryptedWriteRequest e = 
EncryptedWriteRequest.class.cast(writeRequest);
+       public void messageSent(NextFilter next, IoSession session, 
WriteRequest request) throws Exception {
+               if (LOGGER.isDebugEnabled())
+                       LOGGER.debug("session {} sent {}", session, request);
+
+               if (request instanceof EncryptedWriteRequest) {
+                       EncryptedWriteRequest e = 
EncryptedWriteRequest.class.cast(request);
                        SSL2Handler x = 
SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
-                       x.ack(next, writeRequest);
-                       if (e.getParentRequest() != null) {
-                               next.messageSent(session, e.getParentRequest());
+                       x.ack(next, request);
+                       if (e.getOriginalRequest() != e) {
+                               next.messageSent(session, 
e.getOriginalRequest());
                        }
                } else {
-                       super.messageSent(next, session, writeRequest);
+                       super.messageSent(next, session, request);
                }
        }
 
        @Override
-       public void filterWrite(NextFilter next, IoSession session, 
WriteRequest writeRequest) throws Exception {
+       public void filterWrite(NextFilter next, IoSession session, 
WriteRequest request) throws Exception {
 
-               LOGGER.debug("session write {}", session);
+               LOGGER.debug("session {} write {}", session, request);
 
-               if (writeRequest instanceof EncryptedWriteRequest) {
-                       super.filterWrite(next, session, writeRequest);
+               if (request instanceof EncryptedWriteRequest) {
+                       super.filterWrite(next, session, request);
                } else {
                        SSL2Handler x = 
SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER));
-                       x.write(next, writeRequest);
+                       x.write(next, request);
                }
        }
 }
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java 
b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
index 1e8e59b..d8eb1eb 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java
@@ -18,6 +18,21 @@ import org.slf4j.LoggerFactory;
 public abstract class SSL2Handler {
 
        /**
+        * Minimum size of encoder buffer in packets
+        */
+       static protected final int MIN_ENCODER_PACKETS = 2;
+
+       /**
+        * Maximum size of encoder buffer in packets
+        */
+       static protected final int MAX_ENCODER_PACKETS = 8;
+
+       /**
+        * Zero length buffer used to prime the ssl engine
+        */
+       static protected final IoBuffer ZERO = IoBuffer.allocate(0, true);
+
+       /**
         * Static logger
         */
        static protected final Logger LOGGER = 
LoggerFactory.getLogger(SSL2Handler.class);
@@ -25,7 +40,7 @@ public abstract class SSL2Handler {
        /**
         * Write Requests which are enqueued prior to the completion of the 
handshaking
         */
-       protected final Deque<WriteRequest> mWriteQueue = new 
ConcurrentLinkedDeque<>();
+       protected final Deque<WriteRequest> mEncodeQueue = new 
ConcurrentLinkedDeque<>();
 
        /**
         * Requests which have been sent to the socket and waiting 
acknowledgment
@@ -200,7 +215,8 @@ public abstract class SSL2Handler {
                SSLSession session = this.mEngine.getHandshakeSession();
                if (session == null)
                        session = this.mEngine.getSession();
-               int packets = Math.max(2, Math.min(16, 1 + (estimate / 
session.getApplicationBufferSize())));
+               int packets = Math.max(MIN_ENCODER_PACKETS,
+                               Math.min(MAX_ENCODER_PACKETS, 1 + (estimate / 
session.getApplicationBufferSize())));
                return IoBuffer.allocate(packets * 
session.getPacketBufferSize());
        }
 
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java 
b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
index 803927f..9961a32 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java
@@ -4,7 +4,6 @@ import java.util.concurrent.Executor;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLException;
 
 import org.apache.mina.core.buffer.IoBuffer;
@@ -14,36 +13,42 @@ import org.apache.mina.core.write.WriteRequest;
 
 public class SSL2HandlerG0 extends SSL2Handler {
 
+       /**
+        * Maximum number of messages waiting acknowledgement
+        */
+       static protected final int MAX_UNACK_MESSAGES = 6;
+
        public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) {
                super(p, e, s);
        }
 
+       /**
+        * {@inheritDoc}
+        */
        synchronized public void open(final NextFilter next) throws 
SSLException {
                if (this.mEngine.getUseClientMode()) {
-
                        if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("{} open() - begin handshaking", 
toString());
                        }
-
                        this.mEngine.beginHandshake();
                        this.lwrite(next);
                }
        }
 
+       /**
+        * {@inheritDoc}
+        */
        synchronized public void receive(final NextFilter next, final IoBuffer 
message) throws SSLException {
-
+               final IoBuffer source = resume_decode_buffer(message);
                if (LOGGER.isDebugEnabled()) {
-                       LOGGER.debug("{} receive() - source {}", toString(), 
message);
+                       LOGGER.debug("{} receive() - source {}", toString(), 
source);
                }
-
-               final IoBuffer input = resume_decode_buffer(message);
-
                try {
-                       while (lreceive(next, input) && message.hasRemaining()) 
{
-                               // spin
+                       while (lreceive(next, source) && 
message.hasRemaining()) {
+                               // loop until the message is consumed
                        }
                } finally {
-                       save_decode_buffer(input);
+                       save_decode_buffer(source);
                }
        }
 
@@ -63,7 +68,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
                        LOGGER.debug("{} lreceive() - source {}", toString(), 
message);
                }
 
-               final IoBuffer source = message == null ? IoBuffer.allocate(0) 
: message;
+               final IoBuffer source = message == null ? ZERO : message;
                final IoBuffer dest = allocate_app_buffer(source.remaining());
 
                final SSLEngineResult result = mEngine.unwrap(source.buf(), 
dest.buf());
@@ -77,11 +82,9 @@ public class SSL2HandlerG0 extends SSL2Handler {
                        dest.free();
                } else {
                        dest.flip();
-
                        if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("{} lreceive() - result {}", 
toString(), dest);
                        }
-
                        next.messageReceived(this.mSession, dest);
                }
 
@@ -109,30 +112,42 @@ public class SSL2HandlerG0 extends SSL2Handler {
                return result.bytesConsumed() > 0;
        }
 
+       /**
+        * {@inheritDoc}
+        */
        synchronized public void ack(final NextFilter next, final WriteRequest 
request) throws SSLException {
-
+               if (this.mAckQueue.remove(request)) {
+                       if (LOGGER.isDebugEnabled()) {
+                               LOGGER.debug("{} ack() - {}", toString(), 
request);
+                       }
+                       if (LOGGER.isDebugEnabled()) {
+                               LOGGER.debug("{} ack() - checking to see if any 
messages can be flushed", toString(), request);
+                       }
+                       this.lflush(next);
+               }
        }
 
+       /**
+        * {@inheritDoc}
+        */
        synchronized public void write(final NextFilter next, final 
WriteRequest request) throws SSLException {
-
                if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("{} write() - source {}", toString(), 
request);
                }
 
-               if (this.mWriteQueue.isEmpty()) {
+               if (this.mEncodeQueue.isEmpty()) {
                        if (lwrite(next, request) == false) {
                                if (LOGGER.isDebugEnabled()) {
-                                       LOGGER.debug("{} write() - unable to 
write right now, saving request for later", toString(), request);
+                                       LOGGER.debug("{} write() - unable to 
write right now, saving request for later", toString(),
+                                                       request);
                                }
-
-                               this.mWriteQueue.add(request);
+                               this.mEncodeQueue.add(request);
                        }
                } else {
                        if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("{} write() - unable to write 
right now, saving request for later", toString(), request);
                        }
-
-                       this.mWriteQueue.add(request);
+                       this.mEncodeQueue.add(request);
                }
        }
 
@@ -142,7 +157,8 @@ public class SSL2HandlerG0 extends SSL2Handler {
         * @param request
         * @param session
         * @param next
-        * @return {@code true} if the WriteRequest was successfully written
+        * @return {@code true} if the WriteRequest was fully consumed; 
otherwise
+        *         {@code false}
         * @throws SSLException
         */
        @SuppressWarnings("incomplete-switch")
@@ -166,19 +182,35 @@ public class SSL2HandlerG0 extends SSL2Handler {
                        dest.free();
                } else {
                        if (result.bytesConsumed() == 0) {
-                               next.filterWrite(this.mSession, new 
EncryptedWriteRequest(dest, null));
+                               EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, null);
+                               if (LOGGER.isDebugEnabled()) {
+                                       LOGGER.debug("{} lwrite() - result {}", 
toString(), encrypted);
+                               }
+                               next.filterWrite(this.mSession, encrypted);
                        } else {
                                // then we probably consumed some data
                                dest.flip();
                                if (source.hasRemaining()) {
-                                       next.filterWrite(this.mSession, new 
EncryptedWriteRequest(dest, null));
-                                       lwrite(next, request); // write 
additional chunks
+                                       EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, null);
+                                       this.mAckQueue.add(encrypted);
+                                       if (LOGGER.isDebugEnabled()) {
+                                               LOGGER.debug("{} lwrite() - 
result {}", toString(), encrypted);
+                                       }
+                                       next.filterWrite(this.mSession, 
encrypted);
+                                       if (this.mAckQueue.size() < 
MAX_UNACK_MESSAGES) {
+                                               return lwrite(next, request); 
// write additional chunks
+                                       }
+                                       return false;
                                } else {
                                        source.rewind();
-                                       next.filterWrite(this.mSession, new 
EncryptedWriteRequest(dest, request));
+                                       EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, request);
+                                       this.mAckQueue.add(encrypted);
+                                       if (LOGGER.isDebugEnabled()) {
+                                               LOGGER.debug("{} lwrite() - 
result {}", toString(), encrypted);
+                                       }
+                                       next.filterWrite(this.mSession, 
encrypted);
+                                       return true;
                                }
-
-                               return true;
                        }
                }
 
@@ -206,7 +238,6 @@ public class SSL2HandlerG0 extends SSL2Handler {
                }
 
                return false;
-
        }
 
        /**
@@ -224,7 +255,7 @@ public class SSL2HandlerG0 extends SSL2Handler {
                        LOGGER.debug("{} lwrite() - internal", toString());
                }
 
-               final IoBuffer source = IoBuffer.allocate(0);
+               final IoBuffer source = ZERO;
                final IoBuffer dest = 
allocate_encode_buffer(source.remaining());
 
                final SSLEngineResult result = this.mEngine.wrap(source.buf(), 
dest.buf());
@@ -234,15 +265,13 @@ public class SSL2HandlerG0 extends SSL2Handler {
                                        result.bytesProduced());
                }
 
-               if (dest.position() == 0) {
+               if (result.bytesProduced() == 0) {
                        dest.free();
                } else {
                        dest.flip();
-
                        if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("{} lwrite() - result {}", 
toString(), dest);
                        }
-
                        final EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, null);
                        next.filterWrite(this.mSession, encrypted);
                }
@@ -271,8 +300,14 @@ public class SSL2HandlerG0 extends SSL2Handler {
                return result.bytesProduced() > 0;
        }
 
+       /**
+        * Flushes the encode queue
+        * 
+        * @param next
+        * @throws SSLException
+        */
        synchronized protected void lflush(final NextFilter next) throws 
SSLException {
-               if (this.mWriteQueue.isEmpty()) {
+               if (this.mEncodeQueue.isEmpty()) {
                        if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("{} flush() - no saved messages", 
toString());
                        }
@@ -280,15 +315,20 @@ public class SSL2HandlerG0 extends SSL2Handler {
                }
 
                WriteRequest current = null;
-
-               while ((current = this.mWriteQueue.poll()) != null) {
+               while ((this.mAckQueue.size() < MAX_UNACK_MESSAGES) && (current 
= this.mEncodeQueue.poll()) != null) {
+                       if (LOGGER.isDebugEnabled()) {
+                               LOGGER.debug("{} flush() - {}", toString(), 
current);
+                       }
                        if (lwrite(next, current) == false) {
-                               this.mWriteQueue.addFirst(current);
+                               this.mEncodeQueue.addFirst(current);
                                break;
                        }
                }
        }
 
+       /**
+        * {@inheritDoc}
+        */
        synchronized public void close(final NextFilter next) throws 
SSLException {
                if (mEngine.isOutboundDone())
                        return;
diff --git 
a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java 
b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
index beb6577..aef4ead 100644
--- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
+++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java
@@ -77,13 +77,7 @@ public class SSL2SimpleTest {
 //
 //             }
 
-               client_socket.write(createWriteRequest());
-
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-
-               }
+               
client_socket.write(createMosaicRequest()).awaitUninterruptibly();
 
                client_socket.closeNow();
 
@@ -104,7 +98,18 @@ public class SSL2SimpleTest {
                }
        }
 
-       public static IoBuffer createWriteRequest() {
+       public static IoBuffer createMosaicRequest() {
+               // HTTP request
+               IoBuffer message = IoBuffer.allocate(100 * 1024);
+               while (message.hasRemaining()) {
+                       message.putInt(0xFF332211);
+               }
+               message.flip();
+
+               return message;
+       }
+
+       public static IoBuffer createHttpRequest() {
                // HTTP request
                StringBuilder http = new StringBuilder();
                http.append("GET / HTTP/1.0\r\n");

Reply via email to