This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch bugfix/DIRMINA1132 in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/bugfix/DIRMINA1132 by this push: new 3dc8265 Improves loop encode/decode functions 3dc8265 is described below commit 3dc8265745a2b448597ce5de43a58bdb4e2499bb Author: Jonathan Valliere <john...@apache.org> AuthorDate: Sun Jul 25 12:02:46 2021 -0400 Improves loop encode/decode functions --- .../org/apache/mina/filter/ssl2/SSL2HandlerG0.java | 209 +++++++++++++++------ .../apache/mina/filter/ssl2/SSL2SimpleTest.java | 22 +-- 2 files changed, 160 insertions(+), 71 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java index 2f8300d..8f4e8d6 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl2/SSL2HandlerG0.java @@ -20,10 +20,25 @@ public class SSL2HandlerG0 extends SSL2Handler { static protected final int MAX_UNACK_MESSAGES = 6; /** + * Enable aggregation of handshake messages + */ + static protected final boolean ENABLE_FAST_HANDSHAKE = true; + + /** + * Enable asynchronous tasks + */ + static protected final boolean ENABLE_ASYNC_TASKS = true; + + /** * Indicates whether the first handshake was completed */ protected boolean mHandshakeComplete = false; + /** + * Indicated whether the first handshake was started + */ + protected boolean mHandshakeStarted = false; + public SSL2HandlerG0(SSLEngine p, Executor e, IoSession s) { super(p, e, s); } @@ -48,12 +63,15 @@ public class SSL2HandlerG0 extends SSL2Handler { * {@inheritDoc} */ synchronized public void open(final NextFilter next) throws SSLException { - if (this.mEngine.getUseClientMode()) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} open() - begin handshaking", toString()); + if (this.mHandshakeStarted == false) { + this.mHandshakeStarted = true; + if (this.mEngine.getUseClientMode()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} open() - begin handshaking", toString()); + } + this.mEngine.beginHandshake(); + this.qwrite(next); } - this.mEngine.beginHandshake(); - this.lwrite(next); } } @@ -61,13 +79,13 @@ public class SSL2HandlerG0 extends SSL2Handler { * {@inheritDoc} */ synchronized public void receive(final NextFilter next, final IoBuffer message) throws SSLException { - final IoBuffer source = resume_decode_buffer(message); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} receive() - source {}", toString(), source); + LOGGER.debug("{} receive() - message {}", toString(), message); } + final IoBuffer source = resume_decode_buffer(message); try { - while (lreceive(next, source) && message.hasRemaining()) { - // loop until the message is consumed + if (source.hasRemaining()) { + this.qreceive(next, source); } } finally { save_decode_buffer(source); @@ -80,14 +98,12 @@ public class SSL2HandlerG0 extends SSL2Handler { * @param message received data * @param session user session * @param next filter - * @return {@code true} if some of the message was consumed * @throws SSLException */ @SuppressWarnings("incomplete-switch") - protected boolean lreceive(final NextFilter next, final IoBuffer message) throws SSLException { - + protected void qreceive(final NextFilter next, final IoBuffer message) throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lreceive() - source {}", toString(), message); + LOGGER.debug("{} qreceive() - source {}", toString(), message); } final IoBuffer source = message == null ? ZERO : message; @@ -96,43 +112,60 @@ public class SSL2HandlerG0 extends SSL2Handler { final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lreceive() - bytes-consumed {}, bytes-produced {}, status {}", toString(), - result.bytesConsumed(), result.bytesProduced(), result.getStatus()); + LOGGER.debug("{} qreceive() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), + result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); } + final boolean success = result.bytesConsumed() != 0; + if (result.bytesProduced() == 0) { dest.free(); } else { dest.flip(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lreceive() - result {}", toString(), dest); + LOGGER.debug("{} qreceive() - result {}", toString(), dest); } next.messageReceived(this.mSession, dest); } switch (result.getHandshakeStatus()) { + case NEED_UNWRAP: + case NEED_UNWRAP_AGAIN: + if (success && source.hasRemaining()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} qreceive() - handshake needs unwrap, looping", toString()); + } + this.qreceive(next, message); + } + break; case NEED_TASK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lreceive() - handshake needs task, scheduling tasks", toString()); + LOGGER.debug("{} qreceive() - handshake needs task, scheduling", toString()); } this.schedule_task(next); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lreceive() - handshake needs to write a new message", toString()); + LOGGER.debug("{} qreceive() - handshake needs wrap, invoking write", toString()); } - this.lwrite(next); + this.qwrite(next); break; case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lreceive() - handshake finished, flushing pending requests", toString()); + LOGGER.debug("{} qreceive() - handshake finished, flushing queue", toString()); } this.lfinish(next); this.lflush(next); break; + case NOT_HANDSHAKING: + if (success && message.hasRemaining()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} qreceive() - trying to decode more messages, looping", toString()); + } + this.qreceive(next, message); + } + break; } - - return result.bytesConsumed() > 0; } /** @@ -159,7 +192,7 @@ public class SSL2HandlerG0 extends SSL2Handler { } if (this.mEncodeQueue.isEmpty()) { - if (lwrite(next, request) == false) { + if (qwrite(next, request) == false) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); @@ -185,9 +218,9 @@ public class SSL2HandlerG0 extends SSL2Handler { * @throws SSLException */ @SuppressWarnings("incomplete-switch") - synchronized protected boolean lwrite(final NextFilter next, final WriteRequest request) throws SSLException { + synchronized protected boolean qwrite(final NextFilter next, final WriteRequest request) throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - source {}", toString(), request); + LOGGER.debug("{} qwrite() - source {}", toString(), request); } final IoBuffer source = IoBuffer.class.cast(request.getMessage()); @@ -196,7 +229,7 @@ public class SSL2HandlerG0 extends SSL2Handler { final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), + LOGGER.debug("{} qwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); } @@ -206,7 +239,7 @@ public class SSL2HandlerG0 extends SSL2Handler { if (result.bytesConsumed() == 0) { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - result {}", toString(), encrypted); + LOGGER.debug("{} qwrite() - result {}", toString(), encrypted); } next.filterWrite(this.mSession, encrypted); } else { @@ -216,11 +249,11 @@ public class SSL2HandlerG0 extends SSL2Handler { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); this.mAckQueue.add(encrypted); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - result {}", toString(), encrypted); + LOGGER.debug("{} qwrite() - result {}", toString(), encrypted); } next.filterWrite(this.mSession, encrypted); if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) { - return lwrite(next, request); // write additional chunks + return qwrite(next, request); // write additional chunks } return false; } else { @@ -228,7 +261,7 @@ public class SSL2HandlerG0 extends SSL2Handler { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request); this.mAckQueue.add(encrypted); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - result {}", toString(), encrypted); + LOGGER.debug("{} qwrite() - result {}", toString(), encrypted); } next.filterWrite(this.mSession, encrypted); return true; @@ -239,21 +272,21 @@ public class SSL2HandlerG0 extends SSL2Handler { switch (result.getHandshakeStatus()) { case NEED_TASK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs task, scheduling tasks", toString()); + LOGGER.debug("{} qwrite() - handshake needs task, scheduling", toString()); } this.schedule_task(next); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs to encode a message", toString()); + LOGGER.debug("{} qwrite() - handshake needs wrap, looping", toString()); } - return this.lwrite(next, request); + return this.qwrite(next, request); case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake finished, flushing pending requests", toString()); + LOGGER.debug("{} qwrite() - handshake finished, flushing queue", toString()); } this.lfinish(next); - if (this.lwrite(next, request)) { + if (this.qwrite(next, request)) { this.lflush(next); return true; } @@ -271,24 +304,64 @@ public class SSL2HandlerG0 extends SSL2Handler { * @return {@code true} if a message was generated and written * @throws SSLException */ - @SuppressWarnings("incomplete-switch") - synchronized protected boolean lwrite(NextFilter next) throws SSLException { - + synchronized protected boolean qwrite(NextFilter next) throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - internal", toString()); + LOGGER.debug("{} qwrite() - internal", toString()); } final IoBuffer source = ZERO; final IoBuffer dest = allocate_encode_buffer(source.remaining()); + return lwrite(next, source, dest); + } + + /** + * Attempts to generate a handshake message and write the data to the IoSession. + * <p> + * If FAST_HANDSHAKE is enabled, this method will recursively loop in order to + * combine multiple messages into one buffer. + * + * @param session + * @param next + * @return {@code true} if a message was generated and written + * @throws SSLException + */ + @SuppressWarnings("incomplete-switch") + protected boolean lwrite(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException { final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}", toString(), result.bytesConsumed(), - result.bytesProduced()); + LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), + result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); } - if (result.bytesProduced() == 0) { + if (ENABLE_FAST_HANDSHAKE) { + /** + * Fast handshaking allows multiple handshake messages to be written to a single + * buffer. This reduces the number of network messages used during the handshake + * process. + * + * Additional handshake messages are only written if a message was produced in + * the last loop otherwise any additional messages need to be written by + * NEED_WRAP will be handled in the standard routine below which allocates a new + * buffer. + */ + switch (result.getHandshakeStatus()) { + case NEED_WRAP: + switch (result.getStatus()) { + case OK: + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} lwrite() - handshake needs wrap, fast looping", toString()); + } + return lwrite(next, source, dest); + } + break; + } + } + + final boolean success = dest.position() != 0; + + if (success == false) { dest.free(); } else { dest.flip(); @@ -300,30 +373,42 @@ public class SSL2HandlerG0 extends SSL2Handler { } switch (result.getHandshakeStatus()) { - case NEED_TASK: + case NEED_UNWRAP: + case NEED_UNWRAP_AGAIN: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs task, scheduling tasks", toString()); + LOGGER.debug("{} lwrite() - handshake needs unwrap, invoking receive", toString()); } - this.schedule_task(next); + this.receive(next, ZERO); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs to encode a message", toString()); + LOGGER.debug("{} lwrite() - handshake needs wrap, looping", toString()); + } + this.qwrite(next); + break; + case NEED_TASK: + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} lwrite() - handshake needs task, scheduling", toString()); } - this.lwrite(next); + this.schedule_task(next); break; case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake finished, flushing pending requests", toString()); + LOGGER.debug("{} lwrite() - handshake finished, flushing queue", toString()); } this.lfinish(next); this.lflush(next); break; } - return result.bytesProduced() > 0; + return success; } + /** + * Marks the handshake as complete and emits any signals + * + * @param next + */ synchronized protected void lfinish(final NextFilter next) { this.mHandshakeComplete = true; next.event(this.mSession, SslEvent.SECURED); @@ -348,7 +433,7 @@ public class SSL2HandlerG0 extends SSL2Handler { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} flush() - {}", toString(), current); } - if (lwrite(next, current) == false) { + if (qwrite(next, current) == false) { this.mEncodeQueue.addFirst(current); break; } @@ -367,19 +452,23 @@ public class SSL2HandlerG0 extends SSL2Handler { } mEngine.closeOutbound(); - this.lwrite(next); + this.qwrite(next); } protected void schedule_task(final NextFilter next) { - if (this.mExecutor == null) { - this.execute_task(next); + if (ENABLE_ASYNC_TASKS) { + if (this.mExecutor == null) { + this.execute_task(next); + } else { + this.mExecutor.execute(new Runnable() { + @Override + public void run() { + SSL2HandlerG0.this.execute_task(next); + } + }); + } } else { - this.mExecutor.execute(new Runnable() { - @Override - public void run() { - SSL2HandlerG0.this.execute_task(next); - } - }); + this.execute_task(next); } } @@ -397,7 +486,7 @@ public class SSL2HandlerG0 extends SSL2Handler { LOGGER.debug("{} task() - writing handshake messages", toString()); } - lwrite(next); + qwrite(next); } catch (SSLException e) { e.printStackTrace(); } diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java index aef4ead..8896875 100644 --- a/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java +++ b/mina-core/src/test/java/org/apache/mina/filter/ssl2/SSL2SimpleTest.java @@ -2,6 +2,7 @@ package org.apache.mina.filter.ssl2; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -20,8 +21,6 @@ import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; -import org.apache.mina.core.write.DefaultWriteRequest; -import org.apache.mina.core.write.WriteRequest; import org.apache.mina.filter.ssl.SslDIRMINA937Test; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketConnector; @@ -63,23 +62,24 @@ public class SSL2SimpleTest { socket_connector.getFilterChain().addFirst("ssl", filter); socket_connector.setHandler(new DebugFilter()); - final InetSocketAddress server_address = new InetSocketAddress("0.0.0.0", 53301); - socket_acceptor.bind(server_address); + socket_acceptor.bind(new InetSocketAddress("0.0.0.0", 0)); + + final SocketAddress server_address = socket_acceptor.getLocalAddress(); final IoFuture connect_future = socket_connector.connect(server_address); connect_future.awaitUninterruptibly(); final IoSession client_socket = connect_future.getSession(); -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// -// } - client_socket.write(createMosaicRequest()).awaitUninterruptibly(); - client_socket.closeNow(); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + + client_socket.closeNow().awaitUninterruptibly(); socket_connector.dispose();