This is an automated email from the ASF dual-hosted git repository.

johnnyv pushed a commit to branch bugfix/DIRMINA-1173
in repository https://gitbox.apache.org/repos/asf/mina.git


The following commit(s) were added to refs/heads/bugfix/DIRMINA-1173 by this 
push:
     new ec93bb746 nonblock SSL now passes all tests
ec93bb746 is described below

commit ec93bb746b1a3216167341bf2470ae939fba55c8
Author: Jonathan Valliere <jon.valli...@emoten.com>
AuthorDate: Mon Feb 19 22:25:50 2024 -0500

    nonblock SSL now passes all tests
    
    The following public endpoints for SSLHandlerG1 now correctly handle the 
non-block operations
    - open
    - write
    - receive
    - ack
    - flush
    - close
    I added try..finally blocks to ensure processed messages are fired even if 
a subsequent message caused the SSL to fail.
---
 .../org/apache/mina/filter/ssl/SSLHandlerG1.java   | 220 ++++++++++-----------
 .../java/org/apache/mina/filter/ssl/SslFilter.java |  15 +-
 2 files changed, 115 insertions(+), 120 deletions(-)

diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java 
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
index 5cb6858ea..80f52752b 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
@@ -68,7 +68,7 @@ import java.util.concurrent.Executor;
     /**
      * Enable asynchronous tasks
      */
-    static protected final boolean ENABLE_ASYNC_TASKS = false;
+    static protected final boolean ENABLE_ASYNC_TASKS = true;
 
     /**
      * Indicates whether the first handshake was completed
@@ -142,25 +142,21 @@ import java.util.concurrent.Executor;
      */
     @Override
     public void open(NextFilter next) throws SSLException {
-        synchronized (this) {
-            if (mHandshakeStarted == false) {
-                mHandshakeStarted = true;
-                if (mEngine.getUseClientMode()) {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("{} open() - begin handshaking", this);
+        try {
+            synchronized (this) {
+                if (mHandshakeStarted == false) {
+                    mHandshakeStarted = true;
+                    if (mEngine.getUseClientMode()) {
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("{} open() - begin handshaking", 
this);
+                        }
+                        mEngine.beginHandshake();
+                        write_handshake(next);
                     }
-                    mEngine.beginHandshake();
-                    write_handshake(next);
                 }
             }
-        }
-        synchronized (mWriteQueue) {
-            EncryptedWriteRequest x;
-            while((x = mWriteQueue.poll()) != null) {
-                next.filterWrite(mSession, x);
-            }
-        }
-        synchronized (this) {
+        } finally {
+            forward_writes(next);
             throw_pending_error(next);
         }
     }
@@ -170,20 +166,11 @@ import java.util.concurrent.Executor;
      */
     @Override
     public void receive(NextFilter next, IoBuffer message) throws SSLException 
{
-        receive_start(next, message);
-        synchronized (mReceiveQueue) {
-            IoBuffer x;
-            while((x = mReceiveQueue.poll()) != null) {
-                next.messageReceived(mSession, x);
-            }
-        }
-        synchronized (mWriteQueue) {
-            EncryptedWriteRequest x;
-            while((x = mWriteQueue.poll()) != null) {
-                next.filterWrite(mSession, x);
-            }
-        }
-        synchronized (this) {
+        try {
+            receive_start(next, message);
+        } finally {
+            forward_received(next);
+            forward_writes(next);
             throw_pending_error(next);
         }
     }
@@ -267,7 +254,7 @@ import java.util.concurrent.Executor;
                 LOGGER.debug("{} receive_loop() - result {}", toString(), 
dest);
             }
 
-            mReceiveQueue.push(dest);
+            mReceiveQueue.add(dest);
         }
 
         switch (result.getHandshakeStatus()) {
@@ -323,25 +310,25 @@ import java.util.concurrent.Executor;
      */
     @Override
     public void ack(NextFilter next, WriteRequest request) throws SSLException 
{
-        synchronized (this) {
-            if (mAckQueue.remove(request)) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} ack() - {}", toString(), request);
-                }
+        try {
+            synchronized (this) {
+                if (mAckQueue.remove(request)) {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("{} ack() - accepted {}", toString(), 
request);
+                    }
 
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} ack() - checking to see if any messages 
can be flushed", toString(), request);
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("{} ack() - checking to see if any 
messages can be flushed", toString(), request);
+                    }
+                    flush_start(next);
+                } else {
+                    if(LOGGER.isWarnEnabled()) {
+                        LOGGER.warn("{} ack() - unknown message {}", 
toString(), request);
+                    }
                 }
-                flush_start(next);
             }
-        }
-        synchronized (mWriteQueue) {
-            EncryptedWriteRequest x;
-            while((x = mWriteQueue.poll()) != null) {
-                next.filterWrite(mSession, x);
-            }
-        }
-        synchronized (this) {
+        } finally {
+            forward_writes(next);
             throw_pending_error(next);
         }
     }
@@ -351,41 +338,37 @@ import java.util.concurrent.Executor;
      */
     @Override
     public void write(NextFilter next, WriteRequest request) throws 
SSLException, WriteRejectedException {
-        synchronized (this) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("{} write() - source {}", toString(), request);
-            }
-            if (mOutboundClosing) {
-                throw new WriteRejectedException(request, "closing");
-            }
-            if (mEncodeQueue.isEmpty()) {
-                if (write_loop(next, request) == false) {
+        try {
+            synchronized (this) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("{} write() - source {}", toString(), 
request);
+                }
+                if (mOutboundClosing) {
+                    throw new WriteRejectedException(request, "closing");
+                }
+                if (mEncodeQueue.isEmpty()) {
+                    if (write_loop(next, request) == false) {
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("{} write() - unable to write right 
now, saving request for later", toString(),
+                                    request);
+                        }
+                        if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
+                            throw new BufferOverflowException();
+                        }
+                        mEncodeQueue.add(request);
+                    }
+                } else {
                     if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("{} write() - unable to write right now, 
saving request for later", toString(),
-                                request);
+                        LOGGER.debug("{} write() - unable to write right now, 
saving request for later", toString(), request);
                     }
                     if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
                         throw new BufferOverflowException();
                     }
                     mEncodeQueue.add(request);
                 }
-            } else {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} write() - unable to write right now, 
saving request for later", toString(), request);
-                }
-                if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
-                    throw new BufferOverflowException();
-                }
-                mEncodeQueue.add(request);
             }
-        }
-        synchronized (mWriteQueue) {
-            EncryptedWriteRequest x;
-            while((x = mWriteQueue.poll()) != null) {
-                next.filterWrite(mSession, x);
-            }
-        }
-        synchronized (this) {
+        } finally {
+            forward_writes(next);
             throw_pending_error(next);
         }
     }
@@ -404,7 +387,7 @@ import java.util.concurrent.Executor;
     @SuppressWarnings("incomplete-switch")
     synchronized protected boolean write_loop(NextFilter next, WriteRequest 
request) throws SSLException {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("{} write_user_loop() - source {}", toString(), 
request);
+            LOGGER.debug("{} write_loop() - source {}", toString(), request);
         }
 
         IoBuffer source = IoBuffer.class.cast(request.getMessage());
@@ -413,7 +396,7 @@ import java.util.concurrent.Executor;
         SSLEngineResult result = mEngine.wrap(source.buf(), dest.buf());
 
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("{} write_user_loop() - bytes-consumed {}, 
bytes-produced {}, status {}, handshake {}",
+            LOGGER.debug("{} write_loop() - bytes-consumed {}, bytes-produced 
{}, status {}, handshake {}",
                     toString(), result.bytesConsumed(), 
result.bytesProduced(), result.getStatus(),
                     result.getHandshakeStatus());
         }
@@ -426,10 +409,10 @@ import java.util.concurrent.Executor;
                 EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, null);
                 
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} write_user_loop() - result {}", 
toString(), encrypted);
+                    LOGGER.debug("{} write_loop() - result {}", toString(), 
encrypted);
                 }
 
-                mWriteQueue.push(encrypted);
+                mWriteQueue.add(encrypted);
                 // do not return because we want to enter the handshake switch
             } else {
                 // then we probably consumed some data
@@ -437,28 +420,25 @@ import java.util.concurrent.Executor;
                 
                 if (source.hasRemaining()) {
                     EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, null);
-                    mAckQueue.add(encrypted);
-                    
                     if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("{} write_user_loop() - result {}", 
toString(), encrypted);
+                        LOGGER.debug("{} write_loop() - result {}", 
toString(), encrypted);
                     }
 
-                    mWriteQueue.push(encrypted);
+                    mWriteQueue.add(encrypted);
                     
-                    if (mAckQueue.size() < MAX_UNACK_MESSAGES) {
+                    if (mWriteQueue.size() + mAckQueue.size() < 
MAX_UNACK_MESSAGES) {
                         return write_loop(next, request); // write additional 
chunks
                     }
                     
                     return false;
                 } else {
                     EncryptedWriteRequest encrypted = new 
EncryptedWriteRequest(dest, request);
-                    mAckQueue.add(encrypted);
-                    
+
                     if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("{} write_user_loop() - result {}", 
toString(), encrypted);
+                        LOGGER.debug("{} write_loop() - result {}", 
toString(), encrypted);
                     }
 
-                    mWriteQueue.push(encrypted);
+                    mWriteQueue.add(encrypted);
                     
                     return true;
                 }
@@ -469,7 +449,7 @@ import java.util.concurrent.Executor;
         switch (result.getHandshakeStatus()) {
             case NEED_TASK:
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} write_user_loop() - handshake needs task, 
scheduling", toString());
+                    LOGGER.debug("{} write_loop() - handshake needs task, 
scheduling", toString());
                 }
                 
                 schedule_task(next);
@@ -477,14 +457,14 @@ import java.util.concurrent.Executor;
                 
             case NEED_WRAP:
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} write_user_loop() - handshake needs wrap, 
looping", toString());
+                    LOGGER.debug("{} write_loop() - handshake needs wrap, 
looping", toString());
                 }
                 
                 return write_loop(next, request);
                 
             case FINISHED:
                 if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("{} write_user_loop() - handshake finished, 
flushing queue", toString());
+                    LOGGER.debug("{} write_loop() - handshake finished, 
flushing queue", toString());
                 }
                 
                 finish_handshake(next);
@@ -581,7 +561,7 @@ import java.util.concurrent.Executor;
             }
             
             EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, 
null);
-            mWriteQueue.push(encrypted);
+            mWriteQueue.add(encrypted);
         }
 
         switch (result.getHandshakeStatus()) {
@@ -641,14 +621,10 @@ import java.util.concurrent.Executor;
      * {@inheritDoc}
      */
     public void flush(NextFilter next) throws SSLException {
-        flush_start(next);
-        synchronized (mWriteQueue) {
-            EncryptedWriteRequest x;
-            while((x = mWriteQueue.poll()) != null) {
-                next.filterWrite(mSession, x);
-            }
-        }
-        synchronized (this) {
+        try {
+            flush_start(next);
+        } finally {
+            forward_writes(next);
             throw_pending_error(next);
         }
     }
@@ -701,14 +677,10 @@ import java.util.concurrent.Executor;
      */
     @Override
     public void close(NextFilter next, boolean linger) throws SSLException {
-        close_start(next, linger);
-        synchronized (mWriteQueue) {
-            EncryptedWriteRequest x;
-            while((x = mWriteQueue.poll()) != null) {
-                next.filterWrite(mSession, x);
-            }
-        }
-        synchronized (this) {
+        try {
+            close_start(next, linger);
+        } finally {
+            forward_writes(next);
             throw_pending_error(next);
         }
     }
@@ -747,13 +719,13 @@ import java.util.concurrent.Executor;
      */
     synchronized protected void throw_pending_error(NextFilter next) throws 
SSLException {
         SSLException sslException = mPendingError;
-        
         if (sslException != null) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("{} throw_pending_error() - throwing pending 
error");
+            }
             // Loop to send back the alert messages
             receive_loop(next, null);
-            
             mPendingError = null;
-            
             // And finally rethrow the exception 
             throw sslException;
         }
@@ -770,6 +742,31 @@ import java.util.concurrent.Executor;
         }
     }
 
+    protected void forward_received(NextFilter next) {
+        synchronized (mReceiveQueue) {
+            IoBuffer x;
+            while ((x = mReceiveQueue.poll()) != null) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("{} forward_received() - received {}", 
toString(), x);
+                }
+                next.messageReceived(mSession, x);
+            }
+        }
+    }
+
+    protected void forward_writes(NextFilter next) {
+        synchronized (mWriteQueue) {
+            EncryptedWriteRequest x;
+            while ((x = mWriteQueue.poll()) != null) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("{} forward_writes() - writing {}", 
toString(), x);
+                }
+                mAckQueue.add(x);
+                next.filterWrite(mSession, x);
+            }
+        }
+    }
+
     /**
      * Schedule a SSLEngine task for execution, either using an Executor, or 
immediately.
      *  
@@ -792,7 +789,6 @@ import java.util.concurrent.Executor;
      */
     synchronized protected void execute_task(NextFilter next) {
         Runnable task;
-        
         while ((task = mEngine.getDelegatedTask()) != null) {
             try {
                 if (LOGGER.isDebugEnabled()) {
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java 
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
index 90329f219..c6340f2a0 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
@@ -447,15 +447,14 @@ public class SslFilter extends IoFilterAdapter {
      */
     @Override
     public void messageSent(NextFilter next, IoSession session, WriteRequest 
request) throws Exception {
-        if (LOGGER.isDebugEnabled()) {
-            if (session.isServer()) {
-                LOGGER.debug("SERVER: Session {} ack {}", session, request);
-            } else {
-                LOGGER.debug("CLIENT: Session {} ack {}", session, request);
-            }
-        }
-
         if (request instanceof EncryptedWriteRequest) {
+            if (LOGGER.isDebugEnabled()) {
+                if (session.isServer()) {
+                    LOGGER.debug("SERVER: Session {} ack {}", session, 
request);
+                } else {
+                    LOGGER.debug("CLIENT: Session {} ack {}", session, 
request);
+                }
+            }
             EncryptedWriteRequest encryptedWriteRequest = 
EncryptedWriteRequest.class.cast(request);
             SslHandler sslHandler = getSslHandler(session);
             sslHandler.ack(next, request);

Reply via email to