This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch bugfix/DIRMINA-1173 in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/bugfix/DIRMINA-1173 by this push: new ec93bb746 nonblock SSL now passes all tests ec93bb746 is described below commit ec93bb746b1a3216167341bf2470ae939fba55c8 Author: Jonathan Valliere <jon.valli...@emoten.com> AuthorDate: Mon Feb 19 22:25:50 2024 -0500 nonblock SSL now passes all tests The following public endpoints for SSLHandlerG1 now correctly handle the non-block operations - open - write - receive - ack - flush - close I added try..finally blocks to ensure processed messages are fired even if a subsequent message caused the SSL to fail. --- .../org/apache/mina/filter/ssl/SSLHandlerG1.java | 220 ++++++++++----------- .../java/org/apache/mina/filter/ssl/SslFilter.java | 15 +- 2 files changed, 115 insertions(+), 120 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java index 5cb6858ea..80f52752b 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java @@ -68,7 +68,7 @@ import java.util.concurrent.Executor; /** * Enable asynchronous tasks */ - static protected final boolean ENABLE_ASYNC_TASKS = false; + static protected final boolean ENABLE_ASYNC_TASKS = true; /** * Indicates whether the first handshake was completed @@ -142,25 +142,21 @@ import java.util.concurrent.Executor; */ @Override public void open(NextFilter next) throws SSLException { - synchronized (this) { - if (mHandshakeStarted == false) { - mHandshakeStarted = true; - if (mEngine.getUseClientMode()) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} open() - begin handshaking", this); + try { + synchronized (this) { + if (mHandshakeStarted == false) { + mHandshakeStarted = true; + if (mEngine.getUseClientMode()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} open() - begin handshaking", this); + } + mEngine.beginHandshake(); + write_handshake(next); } - mEngine.beginHandshake(); - write_handshake(next); } } - } - synchronized (mWriteQueue) { - EncryptedWriteRequest x; - while((x = mWriteQueue.poll()) != null) { - next.filterWrite(mSession, x); - } - } - synchronized (this) { + } finally { + forward_writes(next); throw_pending_error(next); } } @@ -170,20 +166,11 @@ import java.util.concurrent.Executor; */ @Override public void receive(NextFilter next, IoBuffer message) throws SSLException { - receive_start(next, message); - synchronized (mReceiveQueue) { - IoBuffer x; - while((x = mReceiveQueue.poll()) != null) { - next.messageReceived(mSession, x); - } - } - synchronized (mWriteQueue) { - EncryptedWriteRequest x; - while((x = mWriteQueue.poll()) != null) { - next.filterWrite(mSession, x); - } - } - synchronized (this) { + try { + receive_start(next, message); + } finally { + forward_received(next); + forward_writes(next); throw_pending_error(next); } } @@ -267,7 +254,7 @@ import java.util.concurrent.Executor; LOGGER.debug("{} receive_loop() - result {}", toString(), dest); } - mReceiveQueue.push(dest); + mReceiveQueue.add(dest); } switch (result.getHandshakeStatus()) { @@ -323,25 +310,25 @@ import java.util.concurrent.Executor; */ @Override public void ack(NextFilter next, WriteRequest request) throws SSLException { - synchronized (this) { - if (mAckQueue.remove(request)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} ack() - {}", toString(), request); - } + try { + synchronized (this) { + if (mAckQueue.remove(request)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} ack() - accepted {}", toString(), request); + } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request); + } + flush_start(next); + } else { + if(LOGGER.isWarnEnabled()) { + LOGGER.warn("{} ack() - unknown message {}", toString(), request); + } } - flush_start(next); } - } - synchronized (mWriteQueue) { - EncryptedWriteRequest x; - while((x = mWriteQueue.poll()) != null) { - next.filterWrite(mSession, x); - } - } - synchronized (this) { + } finally { + forward_writes(next); throw_pending_error(next); } } @@ -351,41 +338,37 @@ import java.util.concurrent.Executor; */ @Override public void write(NextFilter next, WriteRequest request) throws SSLException, WriteRejectedException { - synchronized (this) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - source {}", toString(), request); - } - if (mOutboundClosing) { - throw new WriteRejectedException(request, "closing"); - } - if (mEncodeQueue.isEmpty()) { - if (write_loop(next, request) == false) { + try { + synchronized (this) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} write() - source {}", toString(), request); + } + if (mOutboundClosing) { + throw new WriteRejectedException(request, "closing"); + } + if (mEncodeQueue.isEmpty()) { + if (write_loop(next, request) == false) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), + request); + } + if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) { + throw new BufferOverflowException(); + } + mEncodeQueue.add(request); + } + } else { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), - request); + LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); } if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) { throw new BufferOverflowException(); } mEncodeQueue.add(request); } - } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); - } - if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) { - throw new BufferOverflowException(); - } - mEncodeQueue.add(request); } - } - synchronized (mWriteQueue) { - EncryptedWriteRequest x; - while((x = mWriteQueue.poll()) != null) { - next.filterWrite(mSession, x); - } - } - synchronized (this) { + } finally { + forward_writes(next); throw_pending_error(next); } } @@ -404,7 +387,7 @@ import java.util.concurrent.Executor; @SuppressWarnings("incomplete-switch") synchronized protected boolean write_loop(NextFilter next, WriteRequest request) throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - source {}", toString(), request); + LOGGER.debug("{} write_loop() - source {}", toString(), request); } IoBuffer source = IoBuffer.class.cast(request.getMessage()); @@ -413,7 +396,7 @@ import java.util.concurrent.Executor; SSLEngineResult result = mEngine.wrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", + LOGGER.debug("{} write_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); } @@ -426,10 +409,10 @@ import java.util.concurrent.Executor; EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); + LOGGER.debug("{} write_loop() - result {}", toString(), encrypted); } - mWriteQueue.push(encrypted); + mWriteQueue.add(encrypted); // do not return because we want to enter the handshake switch } else { // then we probably consumed some data @@ -437,28 +420,25 @@ import java.util.concurrent.Executor; if (source.hasRemaining()) { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); - mAckQueue.add(encrypted); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); + LOGGER.debug("{} write_loop() - result {}", toString(), encrypted); } - mWriteQueue.push(encrypted); + mWriteQueue.add(encrypted); - if (mAckQueue.size() < MAX_UNACK_MESSAGES) { + if (mWriteQueue.size() + mAckQueue.size() < MAX_UNACK_MESSAGES) { return write_loop(next, request); // write additional chunks } return false; } else { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request); - mAckQueue.add(encrypted); - + if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); + LOGGER.debug("{} write_loop() - result {}", toString(), encrypted); } - mWriteQueue.push(encrypted); + mWriteQueue.add(encrypted); return true; } @@ -469,7 +449,7 @@ import java.util.concurrent.Executor; switch (result.getHandshakeStatus()) { case NEED_TASK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - handshake needs task, scheduling", toString()); + LOGGER.debug("{} write_loop() - handshake needs task, scheduling", toString()); } schedule_task(next); @@ -477,14 +457,14 @@ import java.util.concurrent.Executor; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - handshake needs wrap, looping", toString()); + LOGGER.debug("{} write_loop() - handshake needs wrap, looping", toString()); } return write_loop(next, request); case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write_user_loop() - handshake finished, flushing queue", toString()); + LOGGER.debug("{} write_loop() - handshake finished, flushing queue", toString()); } finish_handshake(next); @@ -581,7 +561,7 @@ import java.util.concurrent.Executor; } EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); - mWriteQueue.push(encrypted); + mWriteQueue.add(encrypted); } switch (result.getHandshakeStatus()) { @@ -641,14 +621,10 @@ import java.util.concurrent.Executor; * {@inheritDoc} */ public void flush(NextFilter next) throws SSLException { - flush_start(next); - synchronized (mWriteQueue) { - EncryptedWriteRequest x; - while((x = mWriteQueue.poll()) != null) { - next.filterWrite(mSession, x); - } - } - synchronized (this) { + try { + flush_start(next); + } finally { + forward_writes(next); throw_pending_error(next); } } @@ -701,14 +677,10 @@ import java.util.concurrent.Executor; */ @Override public void close(NextFilter next, boolean linger) throws SSLException { - close_start(next, linger); - synchronized (mWriteQueue) { - EncryptedWriteRequest x; - while((x = mWriteQueue.poll()) != null) { - next.filterWrite(mSession, x); - } - } - synchronized (this) { + try { + close_start(next, linger); + } finally { + forward_writes(next); throw_pending_error(next); } } @@ -747,13 +719,13 @@ import java.util.concurrent.Executor; */ synchronized protected void throw_pending_error(NextFilter next) throws SSLException { SSLException sslException = mPendingError; - if (sslException != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} throw_pending_error() - throwing pending error"); + } // Loop to send back the alert messages receive_loop(next, null); - mPendingError = null; - // And finally rethrow the exception throw sslException; } @@ -770,6 +742,31 @@ import java.util.concurrent.Executor; } } + protected void forward_received(NextFilter next) { + synchronized (mReceiveQueue) { + IoBuffer x; + while ((x = mReceiveQueue.poll()) != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} forward_received() - received {}", toString(), x); + } + next.messageReceived(mSession, x); + } + } + } + + protected void forward_writes(NextFilter next) { + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while ((x = mWriteQueue.poll()) != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} forward_writes() - writing {}", toString(), x); + } + mAckQueue.add(x); + next.filterWrite(mSession, x); + } + } + } + /** * Schedule a SSLEngine task for execution, either using an Executor, or immediately. * @@ -792,7 +789,6 @@ import java.util.concurrent.Executor; */ synchronized protected void execute_task(NextFilter next) { Runnable task; - while ((task = mEngine.getDelegatedTask()) != null) { try { if (LOGGER.isDebugEnabled()) { diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java index 90329f219..c6340f2a0 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java @@ -447,15 +447,14 @@ public class SslFilter extends IoFilterAdapter { */ @Override public void messageSent(NextFilter next, IoSession session, WriteRequest request) throws Exception { - if (LOGGER.isDebugEnabled()) { - if (session.isServer()) { - LOGGER.debug("SERVER: Session {} ack {}", session, request); - } else { - LOGGER.debug("CLIENT: Session {} ack {}", session, request); - } - } - if (request instanceof EncryptedWriteRequest) { + if (LOGGER.isDebugEnabled()) { + if (session.isServer()) { + LOGGER.debug("SERVER: Session {} ack {}", session, request); + } else { + LOGGER.debug("CLIENT: Session {} ack {}", session, request); + } + } EncryptedWriteRequest encryptedWriteRequest = EncryptedWriteRequest.class.cast(request); SslHandler sslHandler = getSslHandler(session); sslHandler.ack(next, request);