This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch bugfix/DIRMINA1132 in repository https://gitbox.apache.org/repos/asf/mina.git
commit d64c8c7fd46cc14cf37aa5505860913925311d17 Author: Jonathan Valliere <john...@apache.org> AuthorDate: Sat Jul 24 13:33:57 2021 -0400 Adds SSL2 flow control --- .../mina/filter/ssl2/EncryptedWriteRequest.java | 23 +--- .../org/apache/mina/filter/ssl2/SSL2Filter.java | 83 ++++++++------- .../org/apache/mina/filter/ssl2/SSL2Handler.java | 20 +++- .../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 116 ++++++++++++++------- .../apache/mina/filter/ssl2/SSL2SimpleTest.java | 21 ++-- 5 files changed, 157 insertions(+), 106 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java index 91fabc7..caf32d7 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/EncryptedWriteRequest.java @@ -1,32 +1,19 @@ package org.apache.mina.filter.ssl2; -import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.write.DefaultWriteRequest; import org.apache.mina.core.write.WriteRequest; public class EncryptedWriteRequest extends DefaultWriteRequest { // The original message - private WriteRequest parentRequest; + private WriteRequest originalRequest; public EncryptedWriteRequest(Object encodedMessage, WriteRequest parent) { - super(encodedMessage, null); + super(encodedMessage, parent != null ? parent.getFuture() : null); + this.originalRequest = parent != null ? parent : this; } - /** - * {@inheritDoc} - */ - @Override - public boolean isEncoded() { - return true; - } - - public WriteRequest getParentRequest() { - return this.parentRequest; - } - - @Override - public WriteFuture getFuture() { - return (this.getParentRequest() != null) ? this.getParentRequest().getFuture() : super.getFuture(); + public WriteRequest getOriginalRequest() { + return this.originalRequest; } } \ No newline at end of file diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java index 052f806..80e5688 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Filter.java @@ -19,6 +19,7 @@ */ package org.apache.mina.filter.ssl2; +import java.net.InetSocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; @@ -38,16 +39,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An SSL filter that encrypts and decrypts the data exchanged in the session. - * Adding this filter triggers SSL handshake procedure immediately by sending a - * SSL 'hello' message, so you don't need to call {@link #startSsl(IoSession)} - * manually unless you are implementing StartTLS (see below). If you don't want - * the handshake procedure to start immediately, please specify {@code false} as - * {@code autoStart} parameter in the constructor. + * An SSL Filter which simplifies and controls the flow of encrypted information + * on the filter-chain. * <p> - * This filter uses an {@link SSLEngine} which was introduced in Java 5, so Java - * version 5 or above is mandatory to use this filter. And please note that this - * filter only works for TCP/IP connections. + * The initial handshake is automatically enabled for "client" sessions once the + * filter is added to the filter-chain and the session is connected. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ @@ -160,15 +156,6 @@ public class SSL2Filter extends IoFilterAdapter { this.mEnabledProtocols = protocols; } - /** - * Executed just before the filter is added into the chain, we do : - * <ul> - * <li>check that we don't have a SSL filter already present - * <li>we update the next filter - * <li>we create the SSL handler helper class - * <li>and we store it into the session's Attributes - * </ul> - */ @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter next) throws Exception { // Check that we don't have a SSL filter already present in the chain @@ -184,34 +171,47 @@ public class SSL2Filter extends IoFilterAdapter { } @Override - public void onPreRemove(IoFilterChain parent, String name, NextFilter next) throws Exception { + public void onPostAdd(IoFilterChain parent, String name, NextFilter next) throws Exception { IoSession session = parent.getSession(); - session.removeAttribute(SSL_HANDLER); + if (session.isConnected()) { + this.sessionConnected(next, session); + } + super.onPostAdd(parent, name, next); } @Override - public void sessionOpened(NextFilter next, IoSession session) throws Exception { - - LOGGER.debug("session openend {}", session); + public void onPreRemove(IoFilterChain parent, String name, NextFilter next) throws Exception { + IoSession session = parent.getSession(); + SSL2Handler x = SSL2Handler.class.cast(session.removeAttribute(SSL_HANDLER)); + if (x != null) { + x.close(next); + } + } + protected void sessionConnected(NextFilter next, IoSession session) throws Exception { SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER)); if (x == null) { - SSLEngine e = mContext.createSSLEngine(); - + InetSocketAddress s = InetSocketAddress.class.cast(session.getRemoteAddress()); + SSLEngine e = mContext.createSSLEngine(s.getHostString(), s.getPort()); e.setNeedClientAuth(mNeedClientAuth); e.setWantClientAuth(mWantClientAuth); e.setEnabledCipherSuites(mEnabledCipherSuites); e.setEnabledProtocols(mEnabledProtocols); e.setUseClientMode(!session.isServer()); - x = new SSL2HandlerG0(e, EXECUTOR, session); - session.setAttribute(SSL_HANDLER, x); } x.open(next); - + } + + @Override + public void sessionOpened(NextFilter next, IoSession session) throws Exception { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("session {} openend", session); + + this.sessionConnected(next, session); super.sessionOpened(next, session); } @@ -222,29 +222,32 @@ public class SSL2Filter extends IoFilterAdapter { } @Override - public void messageSent(NextFilter next, IoSession session, WriteRequest writeRequest) throws Exception { - if (writeRequest instanceof EncryptedWriteRequest) { - EncryptedWriteRequest e = EncryptedWriteRequest.class.cast(writeRequest); + public void messageSent(NextFilter next, IoSession session, WriteRequest request) throws Exception { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("session {} sent {}", session, request); + + if (request instanceof EncryptedWriteRequest) { + EncryptedWriteRequest e = EncryptedWriteRequest.class.cast(request); SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER)); - x.ack(next, writeRequest); - if (e.getParentRequest() != null) { - next.messageSent(session, e.getParentRequest()); + x.ack(next, request); + if (e.getOriginalRequest() != e) { + next.messageSent(session, e.getOriginalRequest()); } } else { - super.messageSent(next, session, writeRequest); + super.messageSent(next, session, request); } } @Override - public void filterWrite(NextFilter next, IoSession session, WriteRequest writeRequest) throws Exception { + public void filterWrite(NextFilter next, IoSession session, WriteRequest request) throws Exception { - LOGGER.debug("session write {}", session); + LOGGER.debug("session {} write {}", session, request); - if (writeRequest instanceof EncryptedWriteRequest) { - super.filterWrite(next, session, writeRequest); + if (request instanceof EncryptedWriteRequest) { + super.filterWrite(next, session, request); } else { SSL2Handler x = SSL2Handler.class.cast(session.getAttribute(SSL_HANDLER)); - x.write(next, writeRequest); + x.write(next, request); } } } diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java index 1e8e59b..d8eb1eb 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2Handler.java @@ -18,6 +18,21 @@ import org.slf4j.LoggerFactory; public abstract class SSL2Handler { /** + * Minimum size of encoder buffer in packets + */ + static protected final int MIN_ENCODER_PACKETS = 2; + + /** + * Maximum size of encoder buffer in packets + */ + static protected final int MAX_ENCODER_PACKETS = 8; + + /** + * Zero length buffer used to prime the ssl engine + */ + static protected final IoBuffer ZERO = IoBuffer.allocate(0, true); + + /** * Static logger */ static protected final Logger LOGGER = LoggerFactory.getLogger(SSL2Handler.class); @@ -25,7 +40,7 @@ public abstract class SSL2Handler { /** * Write Requests which are enqueued prior to the completion of the handshaking */ - protected final Deque<WriteRequest> mWriteQueue = new ConcurrentLinkedDeque<>(); + protected final Deque<WriteRequest> mEncodeQueue = new ConcurrentLinkedDeque<>(); /** * Requests which have been sent to the socket and waiting acknowledgment @@ -200,7 +215,8 @@ public abstract class SSL2Handler { SSLSession session = this.mEngine.getHandshakeSession(); if (session == null) session = this.mEngine.getSession(); - int packets = Math.max(2, Math.min(16, 1 + (estimate / session.getApplicationBufferSize()))); + int packets = Math.max(MIN_ENCODER_PACKETS, + Math.min(MAX_ENCODER_PACKETS, 1 + (estimate / session.getApplicationBufferSize()))); return IoBuffer.allocate(packets * session.getPacketBufferSize()); } diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java index 803927f..9961a32 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java @@ -4,7 +4,6 @@ import java.util.concurrent.Executor; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLException; import org.apache.mina.core.buffer.IoBuffer; @@ -14,36 +13,42 @@ import org.apache.mina.core.write.WriteRequest; public class SSL2HandlerG0 extends SSL2Handler { + /** + * Maximum number of messages waiting acknowledgement + */ + static protected final int MAX_UNACK_MESSAGES = 6; + public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) { super(p, e, s); } + /** + * {@inheritDoc} + */ synchronized public void open(final NextFilter next) throws SSLException { if (this.mEngine.getUseClientMode()) { - if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} open() - begin handshaking", toString()); } - this.mEngine.beginHandshake(); this.lwrite(next); } } + /** + * {@inheritDoc} + */ synchronized public void receive(final NextFilter next, final IoBuffer message) throws SSLException { - + final IoBuffer source = resume_decode_buffer(message); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} receive() - source {}", toString(), message); + LOGGER.debug("{} receive() - source {}", toString(), source); } - - final IoBuffer input = resume_decode_buffer(message); - try { - while (lreceive(next, input) && message.hasRemaining()) { - // spin + while (lreceive(next, source) && message.hasRemaining()) { + // loop until the message is consumed } } finally { - save_decode_buffer(input); + save_decode_buffer(source); } } @@ -63,7 +68,7 @@ public class SSL2HandlerG0 extends SSL2Handler { LOGGER.debug("{} lreceive() - source {}", toString(), message); } - final IoBuffer source = message == null ? IoBuffer.allocate(0) : message; + final IoBuffer source = message == null ? ZERO : message; final IoBuffer dest = allocate_app_buffer(source.remaining()); final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf()); @@ -77,11 +82,9 @@ public class SSL2HandlerG0 extends SSL2Handler { dest.free(); } else { dest.flip(); - if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} lreceive() - result {}", toString(), dest); } - next.messageReceived(this.mSession, dest); } @@ -109,30 +112,42 @@ public class SSL2HandlerG0 extends SSL2Handler { return result.bytesConsumed() > 0; } + /** + * {@inheritDoc} + */ synchronized public void ack(final NextFilter next, final WriteRequest request) throws SSLException { - + if (this.mAckQueue.remove(request)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} ack() - {}", toString(), request); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} ack() - checking to see if any messages can be flushed", toString(), request); + } + this.lflush(next); + } } + /** + * {@inheritDoc} + */ synchronized public void write(final NextFilter next, final WriteRequest request) throws SSLException { - if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write() - source {}", toString(), request); } - if (this.mWriteQueue.isEmpty()) { + if (this.mEncodeQueue.isEmpty()) { if (lwrite(next, request) == false) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); + LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), + request); } - - this.mWriteQueue.add(request); + this.mEncodeQueue.add(request); } } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); } - - this.mWriteQueue.add(request); + this.mEncodeQueue.add(request); } } @@ -142,7 +157,8 @@ public class SSL2HandlerG0 extends SSL2Handler { * @param request * @param session * @param next - * @return {@code true} if the WriteRequest was successfully written + * @return {@code true} if the WriteRequest was fully consumed; otherwise + * {@code false} * @throws SSLException */ @SuppressWarnings("incomplete-switch") @@ -166,19 +182,35 @@ public class SSL2HandlerG0 extends SSL2Handler { dest.free(); } else { if (result.bytesConsumed() == 0) { - next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, null)); + EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} lwrite() - result {}", toString(), encrypted); + } + next.filterWrite(this.mSession, encrypted); } else { // then we probably consumed some data dest.flip(); if (source.hasRemaining()) { - next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, null)); - lwrite(next, request); // write additional chunks + EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); + this.mAckQueue.add(encrypted); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} lwrite() - result {}", toString(), encrypted); + } + next.filterWrite(this.mSession, encrypted); + if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) { + return lwrite(next, request); // write additional chunks + } + return false; } else { source.rewind(); - next.filterWrite(this.mSession, new EncryptedWriteRequest(dest, request)); + EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request); + this.mAckQueue.add(encrypted); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} lwrite() - result {}", toString(), encrypted); + } + next.filterWrite(this.mSession, encrypted); + return true; } - - return true; } } @@ -206,7 +238,6 @@ public class SSL2HandlerG0 extends SSL2Handler { } return false; - } /** @@ -224,7 +255,7 @@ public class SSL2HandlerG0 extends SSL2Handler { LOGGER.debug("{} lwrite() - internal", toString()); } - final IoBuffer source = IoBuffer.allocate(0); + final IoBuffer source = ZERO; final IoBuffer dest = allocate_encode_buffer(source.remaining()); final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf()); @@ -234,15 +265,13 @@ public class SSL2HandlerG0 extends SSL2Handler { result.bytesProduced()); } - if (dest.position() == 0) { + if (result.bytesProduced() == 0) { dest.free(); } else { dest.flip(); - if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} lwrite() - result {}", toString(), dest); } - final EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); next.filterWrite(this.mSession, encrypted); } @@ -271,8 +300,14 @@ public class SSL2HandlerG0 extends SSL2Handler { return result.bytesProduced() > 0; } + /** + * Flushes the encode queue + * + * @param next + * @throws SSLException + */ synchronized protected void lflush(final NextFilter next) throws SSLException { - if (this.mWriteQueue.isEmpty()) { + if (this.mEncodeQueue.isEmpty()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} flush() - no saved messages", toString()); } @@ -280,15 +315,20 @@ public class SSL2HandlerG0 extends SSL2Handler { } WriteRequest current = null; - - while ((current = this.mWriteQueue.poll()) != null) { + while ((this.mAckQueue.size() < MAX_UNACK_MESSAGES) && (current = this.mEncodeQueue.poll()) != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} flush() - {}", toString(), current); + } if (lwrite(next, current) == false) { - this.mWriteQueue.addFirst(current); + this.mEncodeQueue.addFirst(current); break; } } } + /** + * {@inheritDoc} + */ synchronized public void close(final NextFilter next) throws SSLException { if (mEngine.isOutboundDone()) return; diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java index beb6577..aef4ead 100644 --- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java +++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java @@ -77,13 +77,7 @@ public class SSL2SimpleTest { // // } - client_socket.write(createWriteRequest()); - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - - } + client_socket.write(createMosaicRequest()).awaitUninterruptibly(); client_socket.closeNow(); @@ -104,7 +98,18 @@ public class SSL2SimpleTest { } } - public static IoBuffer createWriteRequest() { + public static IoBuffer createMosaicRequest() { + // HTTP request + IoBuffer message = IoBuffer.allocate(100 * 1024); + while (message.hasRemaining()) { + message.putInt(0xFF332211); + } + message.flip(); + + return message; + } + + public static IoBuffer createHttpRequest() { // HTTP request StringBuilder http = new StringBuilder(); http.append("GET / HTTP/1.0\r\n");