This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch bugfix/DIRMINA-1173 in repository https://gitbox.apache.org/repos/asf/mina.git
commit 0cbbc8b2ef2a10d6c0ce9f2101fb0ed1b51466a8 Author: Jonathan Valliere <jon.valli...@emoten.com> AuthorDate: Mon Feb 19 21:19:21 2024 -0500 working on the multi-phase synchronization; it was working before but is currently broken --- .../org/apache/mina/filter/ssl/SSLHandlerG0.java | 2 +- .../ssl/{SSLHandlerG0.java => SSLHandlerG1.java} | 288 ++++++++++++--------- .../java/org/apache/mina/filter/ssl/SslFilter.java | 19 +- 3 files changed, 188 insertions(+), 121 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java index 3109426a2..72f3e2bdb 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java @@ -136,7 +136,7 @@ import org.apache.mina.core.write.WriteRequest; if (mEngine.getUseClientMode()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} open() - begin handshaking", toString()); + LOGGER.debug("{} open() - begin handshaking", this); } mEngine.beginHandshake(); diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java similarity index 78% copy from mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java copy to mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java index 3109426a2..5cb6858ea 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java @@ -19,20 +19,21 @@ */ package org.apache.mina.filter.ssl; -import java.nio.BufferOverflowException; -import java.util.ArrayList; -import java.util.concurrent.Executor; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLException; - import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.filterchain.IoFilter.NextFilter; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.WriteRejectedException; import org.apache.mina.core.write.WriteRequest; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import java.nio.BufferOverflowException; +import java.util.ArrayList; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; + /** * Default implementation of SSLHandler * <p> @@ -42,7 +43,7 @@ import org.apache.mina.core.write.WriteRequest; * @author Jonathan Valliere * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ -/* package protected */ class SSLHandlerG0 extends SslHandler { +/* package protected */ class SSLHandlerG1 extends SslHandler { /** * Maximum number of queued messages waiting for encoding @@ -67,7 +68,7 @@ import org.apache.mina.core.write.WriteRequest; /** * Enable asynchronous tasks */ - static protected final boolean ENABLE_ASYNC_TASKS = true; + static protected final boolean ENABLE_ASYNC_TASKS = false; /** * Indicates whether the first handshake was completed @@ -90,9 +91,19 @@ import org.apache.mina.core.write.WriteRequest; protected boolean mOutboundLinger = false; /** - * Holds the decoder thread reference; used for recursion detection + * Holds the decoder thread reference; used for recursion detection introduced by a delegated task + */ + protected volatile Thread mReceiveThread = null; + + /** + * Encoded buffers ready for processing upstream + */ + protected final Deque<EncryptedWriteRequest> mWriteQueue = new ConcurrentLinkedDeque<>(); + + /** + * Decoded buffers ready for processing downstream */ - protected Thread mDecodeThread = null; + protected final Deque<IoBuffer> mReceiveQueue = new ConcurrentLinkedDeque<>(); /** * Captured error state @@ -101,12 +112,12 @@ import org.apache.mina.core.write.WriteRequest; /** * Instantiates a new handler - * + * * @param sslEngine The SSLEngine instance * @param executor The executor instance to use to process tasks * @param session The session to handle */ - public SSLHandlerG0(SSLEngine sslEngine, Executor executor, IoSession session) { + public SSLHandlerG1(SSLEngine sslEngine, Executor executor, IoSession session) { super(sslEngine, executor, session); } @@ -130,49 +141,72 @@ import org.apache.mina.core.write.WriteRequest; * {@inheritDoc} */ @Override - synchronized public void open(NextFilter next) throws SSLException { - if (mHandshakeStarted == false) { - mHandshakeStarted = true; - - if (mEngine.getUseClientMode()) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} open() - begin handshaking", toString()); + public void open(NextFilter next) throws SSLException { + synchronized (this) { + if (mHandshakeStarted == false) { + mHandshakeStarted = true; + if (mEngine.getUseClientMode()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} open() - begin handshaking", this); + } + mEngine.beginHandshake(); + write_handshake(next); } - - mEngine.beginHandshake(); - write_handshake(next); } } + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while((x = mWriteQueue.poll()) != null) { + next.filterWrite(mSession, x); + } + } + synchronized (this) { + throw_pending_error(next); + } } /** * {@inheritDoc} */ @Override - synchronized public void receive(NextFilter next, IoBuffer message) throws SSLException { - if (mDecodeThread == null) { + public void receive(NextFilter next, IoBuffer message) throws SSLException { + receive_start(next, message); + synchronized (mReceiveQueue) { + IoBuffer x; + while((x = mReceiveQueue.poll()) != null) { + next.messageReceived(mSession, x); + } + } + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while((x = mWriteQueue.poll()) != null) { + next.filterWrite(mSession, x); + } + } + synchronized (this) { + throw_pending_error(next); + } + } + + synchronized protected void receive_start(NextFilter next, IoBuffer message) throws SSLException { + if(mReceiveThread == Thread.currentThread()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} receive() - recursion", toString()); + } + receive_loop(next, mDecodeBuffer); + } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} receive() - message {}", toString(), message); } - - mDecodeThread = Thread.currentThread(); + mReceiveThread = Thread.currentThread(); IoBuffer source = resume_decode_buffer(message); - try { receive_loop(next, source); } finally { suspend_decode_buffer(source); - mDecodeThread = null; + mReceiveThread = null; } - } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} receive() - recursion", toString()); - } - - receive_loop(next, mDecodeBuffer); } - - throw_pending_error(next); } /** @@ -232,8 +266,8 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} receive_loop() - result {}", toString(), dest); } - - next.messageReceived(mSession, dest); + + mReceiveQueue.push(dest); } switch (result.getHandshakeStatus()) { @@ -288,61 +322,72 @@ import org.apache.mina.core.write.WriteRequest; * {@inheritDoc} */ @Override - synchronized public void ack(NextFilter next, WriteRequest request) throws SSLException { - if (mAckQueue.remove(request)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} ack() - {}", toString(), request); + public void ack(NextFilter next, WriteRequest request) throws SSLException { + synchronized (this) { + if (mAckQueue.remove(request)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} ack() - {}", toString(), request); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request); + } + flush_start(next); } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request); + } + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while((x = mWriteQueue.poll()) != null) { + next.filterWrite(mSession, x); } - - flush(next); } - - throw_pending_error(next); + synchronized (this) { + throw_pending_error(next); + } } /** * {@inheritDoc} */ @Override - synchronized public void write(NextFilter next, WriteRequest request) throws SSLException, WriteRejectedException { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - source {}", toString(), request); - } - - if (mOutboundClosing) { - throw new WriteRejectedException(request, "closing"); - } - - if (mEncodeQueue.isEmpty()) { - if (write_user_loop(next, request) == false) { + public void write(NextFilter next, WriteRequest request) throws SSLException, WriteRejectedException { + synchronized (this) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} write() - source {}", toString(), request); + } + if (mOutboundClosing) { + throw new WriteRejectedException(request, "closing"); + } + if (mEncodeQueue.isEmpty()) { + if (write_loop(next, request) == false) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), + request); + } + if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) { + throw new BufferOverflowException(); + } + mEncodeQueue.add(request); + } + } else { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), - request); + LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); } - if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) { throw new BufferOverflowException(); } - mEncodeQueue.add(request); } - } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); - } - - if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) { - throw new BufferOverflowException(); + } + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while((x = mWriteQueue.poll()) != null) { + next.filterWrite(mSession, x); } - - mEncodeQueue.add(request); } - - throw_pending_error(next); + synchronized (this) { + throw_pending_error(next); + } } /** @@ -357,7 +402,7 @@ import org.apache.mina.core.write.WriteRequest; * @throws SSLException */ @SuppressWarnings("incomplete-switch") - synchronized protected boolean write_user_loop(NextFilter next, WriteRequest request) throws SSLException { + synchronized protected boolean write_loop(NextFilter next, WriteRequest request) throws SSLException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_user_loop() - source {}", toString(), request); } @@ -383,8 +428,8 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); } - - next.filterWrite(mSession, encrypted); + + mWriteQueue.push(encrypted); // do not return because we want to enter the handshake switch } else { // then we probably consumed some data @@ -397,11 +442,11 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); } - - next.filterWrite(mSession, encrypted); + + mWriteQueue.push(encrypted); if (mAckQueue.size() < MAX_UNACK_MESSAGES) { - return write_user_loop(next, request); // write additional chunks + return write_loop(next, request); // write additional chunks } return false; @@ -412,8 +457,8 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); } - - next.filterWrite(mSession, encrypted); + + mWriteQueue.push(encrypted); return true; } @@ -435,7 +480,7 @@ import org.apache.mina.core.write.WriteRequest; LOGGER.debug("{} write_user_loop() - handshake needs wrap, looping", toString()); } - return write_user_loop(next, request); + return write_loop(next, request); case FINISHED: if (LOGGER.isDebugEnabled()) { @@ -444,7 +489,7 @@ import org.apache.mina.core.write.WriteRequest; finish_handshake(next); - return write_user_loop(next, request); + return write_loop(next, request); } return false; @@ -536,7 +581,7 @@ import org.apache.mina.core.write.WriteRequest; } EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); - next.filterWrite(mSession, encrypted); + mWriteQueue.push(encrypted); } switch (result.getHandshakeStatus()) { @@ -544,15 +589,13 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_handshake_loop() - handshake needs unwrap, invoking receive", toString()); } - - receive(next, ZERO); + receive_start(next, ZERO); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, looping", toString()); } - write_handshake(next); break; @@ -560,7 +603,6 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_handshake_loop() - handshake needs task, scheduling", toString()); } - schedule_task(next); break; @@ -568,7 +610,6 @@ import org.apache.mina.core.write.WriteRequest; if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write_handshake_loop() - handshake finished, flushing queue", toString()); } - finish_handshake(next); break; } @@ -592,8 +633,24 @@ import org.apache.mina.core.write.WriteRequest; /** * There exists a bug in the JDK which emits FINISHED twice instead of once. */ - receive(next, ZERO); - flush(next); + receive_start(next, ZERO); + flush_start(next); + } + + /** + * {@inheritDoc} + */ + public void flush(NextFilter next) throws SSLException { + flush_start(next); + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while((x = mWriteQueue.poll()) != null) { + next.filterWrite(mSession, x); + } + } + synchronized (this) { + throw_pending_error(next); + } } /** @@ -603,7 +660,7 @@ import org.apache.mina.core.write.WriteRequest; * * @throws SSLException */ - synchronized public void flush(NextFilter next) throws SSLException { + synchronized protected void flush_start(NextFilter next) throws SSLException { if (mOutboundClosing && mOutboundLinger == false) { return; } @@ -623,7 +680,7 @@ import org.apache.mina.core.write.WriteRequest; LOGGER.debug("{} flush() - {}", toString(), current); } - if (write_user_loop(next, current) == false) { + if (write_loop(next, current) == false) { mEncodeQueue.addFirst(current); break; @@ -643,35 +700,42 @@ import org.apache.mina.core.write.WriteRequest; * {@inheritDoc} */ @Override - synchronized public void close(NextFilter next, boolean linger) throws SSLException { + public void close(NextFilter next, boolean linger) throws SSLException { + close_start(next, linger); + synchronized (mWriteQueue) { + EncryptedWriteRequest x; + while((x = mWriteQueue.poll()) != null) { + next.filterWrite(mSession, x); + } + } + synchronized (this) { + throw_pending_error(next); + } + } + + synchronized protected void close_start(NextFilter next, boolean linger) throws SSLException { if (mOutboundClosing) { return; } - if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} close() - closing session", toString()); } - if (mHandshakeComplete) { next.event(mSession, SslEvent.UNSECURED); } - mOutboundLinger = linger; mOutboundClosing = true; - if (linger == false) { if (mEncodeQueue.size() != 0) { next.exceptionCaught(mSession, new WriteRejectedException(new ArrayList<>(mEncodeQueue), "closing")); mEncodeQueue.clear(); } - mEngine.closeOutbound(); - if (ENABLE_SOFT_CLOSURE) { write_handshake(next); } } else { - flush(next); + flush_start(next); } } @@ -713,12 +777,7 @@ import org.apache.mina.core.write.WriteRequest; */ protected void schedule_task(NextFilter next) { if (ENABLE_ASYNC_TASKS && (mExecutor != null)) { - mExecutor.execute(new Runnable() { - @Override - public void run() { - SSLHandlerG0.this.execute_task(next); - } - }); + mExecutor.execute(() -> SSLHandlerG1.this.execute_task(next)); } else { execute_task(next); } @@ -732,30 +791,25 @@ import org.apache.mina.core.write.WriteRequest; * @param next The next filer in the chain */ synchronized protected void execute_task(NextFilter next) { - Runnable task = null; + Runnable task; while ((task = mEngine.getDelegatedTask()) != null) { try { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} task() - executing {}", toString(), task); } - task.run(); - if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} task() - writing handshake messages", toString()); } - write_handshake(next); } catch (SSLException e) { store_pending_error(e); - try { throw_pending_error(next); } catch ( SSLException ssle) { // ... } - if (LOGGER.isErrorEnabled()) { LOGGER.error("{} task() - storing error {}", toString(), e); } diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java index 418be5ab2..90329f219 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java @@ -60,7 +60,7 @@ public class SslFilter extends IoFilterAdapter { /** * Returns the SSL2Handler object */ - static protected final AttributeKey SSL_HANDLER = new AttributeKey(SslFilter.class, "handler"); + static protected final AttributeKey SSL_HANDLER = new AttributeKey(SslHandler.class, "handler"); /** * The logger @@ -74,13 +74,18 @@ public class SslFilter extends IoFilterAdapter { new LinkedBlockingDeque<>(), new BasicThreadFactory("ssl-exec", true)); protected final SSLContext sslContext; - + /** A flag used to tell the filter to start the handshake immediately (in onPostAdd method) * alternatively handshake will be started after session is connected (in sessionOpened method) * default value is true **/ private final boolean autoStart; + /** + * Enables the non-blocking IO + */ + private boolean nonBlock = true; + /** A flag set if client authentication is required */ protected boolean needClientAuth = false; @@ -135,6 +140,10 @@ public class SslFilter extends IoFilterAdapter { this.autoStart = autoStart; } + public void setNonBlocking(boolean enable) { + this.nonBlock = enable; + } + /** * @return <code>true</code> if the engine will <em>require</em> client * authentication. This option is only useful to engines in the server @@ -299,7 +308,11 @@ public class SslFilter extends IoFilterAdapter { if (sslHandler == null) { InetSocketAddress s = InetSocketAddress.class.cast(session.getRemoteAddress()); SSLEngine sslEngine = createEngine(session, s); - sslHandler = new SSLHandlerG0(sslEngine, EXECUTOR, session); + if(this.nonBlock){ + sslHandler = new SSLHandlerG1(sslEngine, EXECUTOR, session); + }else { + sslHandler = new SSLHandlerG0(sslEngine, EXECUTOR, session); + } session.setAttribute(SSL_HANDLER, sslHandler); }