This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch 2.2.X in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.2.X by this push: new 3b5e22e Store captured errors in async tasks 3b5e22e is described below commit 3b5e22ec41d02ff975d1138b94e29df7e8883006 Author: Jonathan Valliere <john...@apache.org> AuthorDate: Tue Nov 23 08:50:17 2021 -0500 Store captured errors in async tasks Errors captured during async tasks will now be stored and re-thrown during valid filterchain operations. This allows the filterchain to capture exceptions. I may need to disable ENABLE_ASYNC_TASKS later to ensure that the filterchain captures all exceptions quickly. --- .../java/org/apache/mina/filter/ssl/SSLFilter.java | 13 +- .../org/apache/mina/filter/ssl/SSLHandlerG0.java | 140 +++++++++++++-------- 2 files changed, 93 insertions(+), 60 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java index 400920a..2904c90 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLFilter.java @@ -40,8 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A SSL processor which performs flow control of encrypted information - * on the filter-chain. + * A SSL processor which performs flow control of encrypted information on the + * filter-chain. * <p> * The initial handshake is automatically enabled for "client" sessions once the * filter is added to the filter-chain and the session is connected. @@ -51,8 +51,7 @@ import org.slf4j.LoggerFactory; */ public class SSLFilter extends IoFilterAdapter { /** - * The presence of this attribute in a session indicates that the session is - * secured. + * SSLSession object when the session is secured, otherwise null. */ static public final AttributeKey SSL_SECURED = new AttributeKey(SSLFilter.class, "status"); @@ -73,8 +72,8 @@ public class SSLFilter extends IoFilterAdapter { new LinkedBlockingDeque<Runnable>(), new BasicThreadFactory("ssl-exec", true)); protected final SSLContext mContext; - protected boolean mNeedClientAuth; - protected boolean mWantClientAuth; + protected boolean mNeedClientAuth = false; + protected boolean mWantClientAuth = false; protected String[] mEnabledCipherSuites; protected String[] mEnabledProtocols; @@ -230,7 +229,7 @@ public class SSLFilter extends IoFilterAdapter { * Customization handler for creating the engine * * @param session source session - * @param addr socket address used for fast reconnect + * @param addr socket address used for fast reconnect * @return an SSLEngine */ protected SSLEngine createEngine(IoSession session, InetSocketAddress addr) { diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java index f6072d9..db007b3 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG0.java @@ -90,6 +90,11 @@ public class SSLHandlerG0 extends SSLHandler { protected Thread mDecodeThread = null; /** + * Captured error state + */ + protected SSLException mPendingError = null; + + /** * Instantiates a new handler * * @param p engine @@ -127,7 +132,7 @@ public class SSLHandlerG0 extends SSLHandler { LOGGER.debug("{} open() - begin handshaking", toString()); } this.mEngine.beginHandshake(); - this.write(next); + this.write_handshake(next); } } } @@ -143,7 +148,7 @@ public class SSLHandlerG0 extends SSLHandler { this.mDecodeThread = Thread.currentThread(); final IoBuffer source = resume_decode_buffer(message); try { - this.qreceive(next, source); + this.receive_loop(next, source); } finally { suspend_decode_buffer(source); this.mDecodeThread = null; @@ -152,8 +157,10 @@ public class SSLHandlerG0 extends SSLHandler { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} receive() - recursion", toString()); } - this.qreceive(next, this.mDecodeBuffer); + this.receive_loop(next, this.mDecodeBuffer); } + + this.throw_pending_error(); } /** @@ -165,9 +172,9 @@ public class SSLHandlerG0 extends SSLHandler { * @throws SSLException */ @SuppressWarnings("incomplete-switch") - protected void qreceive(final NextFilter next, final IoBuffer message) throws SSLException { + protected void receive_loop(final NextFilter next, final IoBuffer message) throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - source {}", toString(), message); + LOGGER.debug("{} receive_loop() - source {}", toString(), message); } final IoBuffer source = message; @@ -176,7 +183,7 @@ public class SSLHandlerG0 extends SSLHandler { final SSLEngineResult result = mEngine.unwrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), + LOGGER.debug("{} receive_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); } @@ -185,7 +192,7 @@ public class SSLHandlerG0 extends SSLHandler { } else { dest.flip(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - result {}", toString(), dest); + LOGGER.debug("{} receive_loop() - result {}", toString(), dest); } next.messageReceived(this.mSession, dest); } @@ -194,35 +201,35 @@ public class SSLHandlerG0 extends SSLHandler { case NEED_UNWRAP: if (result.bytesConsumed() != 0 && message.hasRemaining()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - handshake needs unwrap, looping", toString()); + LOGGER.debug("{} receive_loop() - handshake needs unwrap, looping", toString()); } - this.qreceive(next, message); + this.receive_loop(next, message); } break; case NEED_TASK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - handshake needs task, scheduling", toString()); + LOGGER.debug("{} receive_loop() - handshake needs task, scheduling", toString()); } this.schedule_task(next); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - handshake needs wrap, invoking write", toString()); + LOGGER.debug("{} receive_loop() - handshake needs wrap, invoking write", toString()); } - this.write(next); + this.write_handshake(next); break; case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - handshake finished, flushing queue", toString()); + LOGGER.debug("{} receive_loop() - handshake finished, flushing queue", toString()); } - this.lfinish(next); + this.finish_handshake(next); break; case NOT_HANDSHAKING: if ((result.bytesProduced() != 0 || result.bytesConsumed() != 0) && message.hasRemaining()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qreceive() - trying to decode more messages, looping", toString()); + LOGGER.debug("{} receive_loop() - trying to decode more messages, looping", toString()); } - this.qreceive(next, message); + this.receive_loop(next, message); } break; } @@ -241,6 +248,8 @@ public class SSLHandlerG0 extends SSLHandler { } this.flush(next); } + + this.throw_pending_error(); } /** @@ -257,7 +266,7 @@ public class SSLHandlerG0 extends SSLHandler { } if (this.mEncodeQueue.isEmpty()) { - if (this.qwrite(next, request) == false) { + if (this.write_user_loop(next, request) == false) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} write() - unable to write right now, saving request for later", toString(), request); @@ -276,6 +285,8 @@ public class SSLHandlerG0 extends SSLHandler { } this.mEncodeQueue.add(request); } + + this.throw_pending_error(); } /** @@ -290,9 +301,10 @@ public class SSLHandlerG0 extends SSLHandler { * @throws SSLException */ @SuppressWarnings("incomplete-switch") - synchronized protected boolean qwrite(final NextFilter next, final WriteRequest request) throws SSLException { + synchronized protected boolean write_user_loop(final NextFilter next, final WriteRequest request) + throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - source {}", toString(), request); + LOGGER.debug("{} write_user_loop() - source {}", toString(), request); } final IoBuffer source = IoBuffer.class.cast(request.getMessage()); @@ -301,8 +313,9 @@ public class SSLHandlerG0 extends SSLHandler { final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), - result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); + LOGGER.debug("{} write_user_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", + toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(), + result.getHandshakeStatus()); } if (result.bytesProduced() == 0) { @@ -312,7 +325,7 @@ public class SSLHandlerG0 extends SSLHandler { // an handshaking message must have been produced EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - result {}", toString(), encrypted); + LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); } next.filterWrite(this.mSession, encrypted); // do not return because we want to enter the handshake switch @@ -323,18 +336,18 @@ public class SSLHandlerG0 extends SSLHandler { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); this.mAckQueue.add(encrypted); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - result {}", toString(), encrypted); + LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); } next.filterWrite(this.mSession, encrypted); if (this.mAckQueue.size() < MAX_UNACK_MESSAGES) { - return qwrite(next, request); // write additional chunks + return write_user_loop(next, request); // write additional chunks } return false; } else { EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, request); this.mAckQueue.add(encrypted); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - result {}", toString(), encrypted); + LOGGER.debug("{} write_user_loop() - result {}", toString(), encrypted); } next.filterWrite(this.mSession, encrypted); return true; @@ -346,21 +359,21 @@ public class SSLHandlerG0 extends SSLHandler { switch (result.getHandshakeStatus()) { case NEED_TASK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - handshake needs task, scheduling", toString()); + LOGGER.debug("{} write_user_loop() - handshake needs task, scheduling", toString()); } this.schedule_task(next); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - handshake needs wrap, looping", toString()); + LOGGER.debug("{} write_user_loop() - handshake needs wrap, looping", toString()); } - return this.qwrite(next, request); + return this.write_user_loop(next, request); case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} qwrite() - handshake finished, flushing queue", toString()); + LOGGER.debug("{} write_user_loop() - handshake finished, flushing queue", toString()); } - this.lfinish(next); - return this.qwrite(next, request); + this.finish_handshake(next); + return this.write_user_loop(next, request); } return false; @@ -375,14 +388,14 @@ public class SSLHandlerG0 extends SSLHandler { * * @throws SSLException */ - synchronized public boolean write(NextFilter next) throws SSLException { + synchronized protected boolean write_handshake(NextFilter next) throws SSLException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} write() - internal", toString()); + LOGGER.debug("{} write_handshake() - internal", toString()); } final IoBuffer source = ZERO; final IoBuffer dest = allocate_encode_buffer(source.remaining()); - return lwrite(next, source, dest); + return write_handshake_loop(next, source, dest); } /** @@ -400,7 +413,7 @@ public class SSLHandlerG0 extends SSLHandler { * @throws SSLException */ @SuppressWarnings("incomplete-switch") - protected boolean lwrite(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException { + protected boolean write_handshake_loop(NextFilter next, IoBuffer source, IoBuffer dest) throws SSLException { if (this.mOutboundClosing && this.mEngine.isOutboundDone()) { return false; } @@ -408,8 +421,9 @@ public class SSLHandlerG0 extends SSLHandler { final SSLEngineResult result = this.mEngine.wrap(source.buf(), dest.buf()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", toString(), - result.bytesConsumed(), result.bytesProduced(), result.getStatus(), result.getHandshakeStatus()); + LOGGER.debug("{} write_handshake_loop() - bytes-consumed {}, bytes-produced {}, status {}, handshake {}", + toString(), result.bytesConsumed(), result.bytesProduced(), result.getStatus(), + result.getHandshakeStatus()); } if (ENABLE_FAST_HANDSHAKE) { @@ -428,9 +442,10 @@ public class SSLHandlerG0 extends SSLHandler { switch (result.getStatus()) { case OK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs wrap, fast looping", toString()); + LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, fast looping", + toString()); } - return lwrite(next, source, dest); + return write_handshake_loop(next, source, dest); } break; } @@ -443,7 +458,7 @@ public class SSLHandlerG0 extends SSLHandler { } else { dest.flip(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - result {}", toString(), dest); + LOGGER.debug("{} write_handshake_loop() - result {}", toString(), dest); } final EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest, null); next.filterWrite(this.mSession, encrypted); @@ -452,27 +467,28 @@ public class SSLHandlerG0 extends SSLHandler { switch (result.getHandshakeStatus()) { case NEED_UNWRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs unwrap, invoking receive", toString()); + LOGGER.debug("{} lwrwrite_handshake_loopite() - handshake needs unwrap, invoking receive", + toString()); } this.receive(next, ZERO); break; case NEED_WRAP: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs wrap, looping", toString()); + LOGGER.debug("{} write_handshake_loop() - handshake needs wrap, looping", toString()); } - this.write(next); + this.write_handshake(next); break; case NEED_TASK: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake needs task, scheduling", toString()); + LOGGER.debug("{} write_handshake_loop() - handshake needs task, scheduling", toString()); } this.schedule_task(next); break; case FINISHED: if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} lwrite() - handshake finished, flushing queue", toString()); + LOGGER.debug("{} write_handshake_loop() - handshake finished, flushing queue", toString()); } - this.lfinish(next); + this.finish_handshake(next); break; } @@ -485,7 +501,7 @@ public class SSLHandlerG0 extends SSLHandler { * @param next * @throws SSLException */ - synchronized protected void lfinish(final NextFilter next) throws SSLException { + synchronized protected void finish_handshake(final NextFilter next) throws SSLException { if (this.mHandshakeComplete == false) { this.mHandshakeComplete = true; this.mSession.setAttribute(SSLFilter.SSL_SECURED, this.mEngine.getSession()); @@ -522,7 +538,7 @@ public class SSLHandlerG0 extends SSLHandler { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} flush() - {}", toString(), current); } - if (this.qwrite(next, current) == false) { + if (this.write_user_loop(next, current) == false) { this.mEncodeQueue.addFirst(current); break; } @@ -530,7 +546,7 @@ public class SSLHandlerG0 extends SSLHandler { if (this.mOutboundClosing && this.mEncodeQueue.size() == 0) { this.mEngine.closeOutbound(); - this.write(next); + this.write_handshake(next); } } @@ -544,7 +560,7 @@ public class SSLHandlerG0 extends SSLHandler { if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} close() - closing session", toString()); } - + if (this.mHandshakeComplete) { next.event(this.mSession, SSLEvent.UNSECURED); } @@ -559,12 +575,27 @@ public class SSLHandlerG0 extends SSLHandler { this.mEncodeQueue.clear(); } this.mEngine.closeOutbound(); - this.write(next); + this.write_handshake(next); } else { this.flush(next); } } + synchronized protected void throw_pending_error() throws SSLException { + final SSLException e = this.mPendingError; + if (e != null) { + this.mPendingError = null; + throw e; + } + } + + synchronized protected void store_pending_error(SSLException e) { + SSLException x = this.mPendingError; + if (x == null) { + this.mPendingError = e; + } + } + protected void schedule_task(final NextFilter next) { if (ENABLE_ASYNC_TASKS) { if (this.mExecutor == null) { @@ -596,9 +627,12 @@ public class SSLHandlerG0 extends SSLHandler { LOGGER.debug("{} task() - writing handshake messages", toString()); } - write(next); + write_handshake(next); } catch (SSLException e) { - e.printStackTrace(); + this.store_pending_error(e); + if (LOGGER.isErrorEnabled()) { + LOGGER.error("{} task() - storing error {}", toString(), e); + } } } }