This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new ba8095e  [SSHD-912] Use separate locks for Future(s) and 
Session/Channel instances
ba8095e is described below

commit ba8095edaa2dc190661f4c1d4b9a6260bce2d51b
Author: Lyor Goldstein <lgoldst...@apache.org>
AuthorDate: Tue Apr 30 08:59:39 2019 +0300

    [SSHD-912] Use separate locks for Future(s) and Session/Channel instances
---
 CHANGES.md                                         |  4 +-
 .../common/util/closeable/AbstractCloseable.java   | 12 ++-
 .../sshd/client/channel/AbstractClientChannel.java | 75 ++++++++++------
 .../sshd/client/channel/ChannelDirectTcpip.java    |  2 +-
 .../keyverifier/DelegatingServerKeyVerifier.java   | 17 ++--
 .../apache/sshd/client/session/ClientSession.java  | 14 ++-
 .../sshd/client/session/ClientSessionImpl.java     | 99 ++++++++++++++--------
 .../sshd/client/session/ClientUserAuthService.java |  2 +-
 .../sshd/common/channel/AbstractChannel.java       |  6 +-
 .../sshd/common/forward/TcpipClientChannel.java    |  2 +-
 .../session/helpers/AbstractConnectionService.java | 24 ++++--
 .../common/session/helpers/AbstractSession.java    | 12 +--
 .../sshd/common/session/helpers/SessionHelper.java | 10 +--
 .../apache/sshd/server/channel/ChannelSession.java |  2 +-
 .../sshd/server/x11/ChannelForwardedX11.java       |  2 +-
 .../test/java/org/apache/sshd/KeepAliveTest.java   | 46 ++++++----
 .../java/org/apache/sshd/client/ClientTest.java    |  5 +-
 .../sshd/client/channel/ChannelExecTest.java       |  4 +-
 .../session/helpers/AbstractSessionTest.java       |  2 +
 .../sshd/deprecated/ClientUserAuthServiceOld.java  |  2 +-
 .../java/org/apache/sshd/server/ServerTest.java    | 22 +++--
 .../sshd/server/channel/ChannelSessionTest.java    | 43 ++++++----
 .../org/apache/sshd/util/test/BogusChannel.java    |  2 +-
 .../apache/sshd/common/io/mina/MinaSession.java    |  2 +-
 .../org/apache/sshd/netty/NettyIoAcceptor.java     |  2 -
 .../java/org/apache/sshd/netty/NettyIoSession.java |  5 +-
 26 files changed, 267 insertions(+), 151 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 273852b..182c736 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -126,4 +126,6 @@ and therefore closing all currently tracked file/directory 
handles.
 
 * [SSHD-909](https://issues.apache.org/jira/browse/SSHD-909) - SFTP versions 
extension handler ignores non-numerical versions when resolving the available 
ones.
 
-* [SSHD-913](https://issues.apache.org/jira/browse/SSHD-913) - Provide channel 
session instance to command and/or shell factories creators     
\ No newline at end of file
+* [SSHD-913](https://issues.apache.org/jira/browse/SSHD-913) - Provide channel 
session instance to command and/or shell factories creators     
+
+* [SSHD-912](https://issues.apache.org/jira/browse/SSHD-912) - Use separate 
locks for Future(s) and Session/Channel instances.
diff --git 
a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
 
b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
index 6413ebb..e503431 100644
--- 
a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
+++ 
b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
@@ -37,9 +37,9 @@ public abstract class AbstractCloseable extends 
IoBaseCloseable {
     }
 
     /**
-     * Lock object for this session state
+     * Lock object for {@code Future}-s based on this closeable instance
      */
-    protected final Object lock = new Object();
+    protected final Object futureLock = new Object();
 
     /**
      * State of this object
@@ -57,7 +57,11 @@ public abstract class AbstractCloseable extends 
IoBaseCloseable {
 
     protected AbstractCloseable(String discriminator) {
         super(discriminator);
-        closeFuture = new DefaultCloseFuture(discriminator, lock);
+        closeFuture = new DefaultCloseFuture(discriminator, futureLock);
+    }
+
+    public Object getFutureLock() {
+        return futureLock;
     }
 
     @Override
@@ -157,6 +161,6 @@ public abstract class AbstractCloseable extends 
IoBaseCloseable {
     }
 
     protected Builder builder() {
-        return new Builder(lock);
+        return new Builder(futureLock);
     }
 }
\ No newline at end of file
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 2673031..611e789 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -211,10 +212,15 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
     @Override
     public Set<ClientChannelEvent> waitFor(Collection<ClientChannelEvent> 
mask, long timeout) {
         Objects.requireNonNull(mask, "No mask specified");
-        long t = 0;
         boolean debugEnabled = log.isDebugEnabled();
         boolean traceEnabled = log.isTraceEnabled();
-        synchronized (lock) {
+        long startTime = System.currentTimeMillis();
+        /*
+         * NOTE !!! we must use the futureLock since some of the events that
+         * we wait on are related to open/close future(s)
+         */
+        synchronized (futureLock) {
+            long remWait = timeout;
             for (Set<ClientChannelEvent> cond = 
EnumSet.noneOf(ClientChannelEvent.class);; cond.clear()) {
                 updateCurrentChannelState(cond);
                 if (debugEnabled) {
@@ -229,48 +235,57 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
                 boolean nothingInCommon = Collections.disjoint(mask, cond);
                 if (!nothingInCommon) {
                     if (traceEnabled) {
-                        log.trace("WaitFor call returning on channel {}, 
mask={}, cond={}", this, mask, cond);
+                        log.trace("waitFor({}) call returning mask={}, 
cond={}", this, mask, cond);
                     }
                     return cond;
                 }
 
                 if (timeout > 0L) {
-                    if (t == 0L) {
-                        t = System.currentTimeMillis() + timeout;
-                    } else {
-                        timeout = t - System.currentTimeMillis();
-                        if (timeout <= 0L) {
-                            if (traceEnabled) {
-                                log.trace("WaitFor call timeout on channel {}, 
mask={}", this, mask);
-                            }
-                            cond.add(ClientChannelEvent.TIMEOUT);
-                            return cond;
+                    long now = System.currentTimeMillis();
+                    long usedTime = now - startTime;
+                    if ((usedTime >= timeout) || (remWait <= 0L)) {
+                        if (traceEnabled) {
+                            log.trace("waitFor({}) call timeout {}/{} for 
mask={}: {}",
+                                this, usedTime, timeout, mask, cond);
                         }
+                        cond.add(ClientChannelEvent.TIMEOUT);
+                        return cond;
                     }
                 }
 
                 if (traceEnabled) {
-                    log.trace("Waiting {} millis for lock on channel {}, 
mask={}, cond={}", timeout, this, mask, cond);
+                    log.trace("waitFor({}) waiting {} millis for lock - 
mask={}, cond={}",
+                        this, remWait, mask, cond);
                 }
 
                 long nanoStart = System.nanoTime();
                 try {
                     if (timeout > 0L) {
-                        lock.wait(timeout);
+                        futureLock.wait(remWait);
                     } else {
-                        lock.wait();
+                        futureLock.wait();
                     }
 
                     long nanoEnd = System.nanoTime();
                     long nanoDuration = nanoEnd - nanoStart;
                     if (traceEnabled) {
-                        log.trace("Lock notified on channel {} after {} 
nanos", this, nanoDuration);
+                        log.trace("waitFor({}) lock notified on channel after 
{} nanos", this, nanoDuration);
+                    }
+
+                    if (timeout > 0L) {
+                        long waitDuration =
+                            TimeUnit.MILLISECONDS.convert(nanoDuration, 
TimeUnit.NANOSECONDS);
+                        if (waitDuration <= 0L) {
+                            waitDuration = 123L;
+                        }
+                        remWait -= waitDuration;
                     }
                 } catch (InterruptedException e) {
                     long nanoEnd = System.nanoTime();
                     long nanoDuration = nanoEnd - nanoStart;
                     if (traceEnabled) {
-                        log.trace("waitFor({}) mask={} - ignoring interrupted 
exception after {} nanos", this, mask, nanoDuration);
+                        log.trace("waitFor({}) mask={} - ignoring interrupted 
exception after {} nanos",
+                            this, mask, nanoDuration);
                     }
                 }
             }
@@ -279,8 +294,9 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
 
     @Override
     public Set<ClientChannelEvent> getChannelState() {
-        Set<ClientChannelEvent> cond = 
EnumSet.noneOf(ClientChannelEvent.class);
-        synchronized (lock) {
+        Set<ClientChannelEvent> cond =
+            EnumSet.noneOf(ClientChannelEvent.class);
+        synchronized (futureLock) {
             return updateCurrentChannelState(cond);
         }
     }
@@ -312,7 +328,7 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
             throw new SshException("Session has been closed");
         }
 
-        openFuture = new DefaultOpenFuture(this.toString(), lock);
+        openFuture = new DefaultOpenFuture(this.toString(), futureLock);
         String type = getChannelType();
         if (log.isDebugEnabled()) {
             log.debug("open({}) Send SSH_MSG_CHANNEL_OPEN - type={}", this, 
type);
@@ -320,7 +336,8 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
 
         Session session = getSession();
         Window wLocal = getLocalWindow();
-        Buffer buffer = 
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, type.length() + 
Integer.SIZE);
+        Buffer buffer =
+            session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, 
type.length() + Integer.SIZE);
         buffer.putString(type);
         buffer.putInt(getId());
         buffer.putInt(wLocal.getSize());
@@ -331,7 +348,8 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
 
     @Override
     public OpenFuture open(int recipient, long rwSize, long packetSize, Buffer 
buffer) {
-        throw new UnsupportedOperationException("open(" + recipient + "," + 
rwSize + "," + packetSize + ") N/A");
+        throw new UnsupportedOperationException(
+            "open(" + recipient + "," + rwSize + "," + packetSize + ") N/A");
     }
 
     @Override
@@ -339,7 +357,8 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
         setRecipient(recipient);
 
         Session session = getSession();
-        FactoryManager manager = 
Objects.requireNonNull(session.getFactoryManager(), "No factory manager");
+        FactoryManager manager =
+            Objects.requireNonNull(session.getFactoryManager(), "No factory 
manager");
         Window wRemote = getRemoteWindow();
         wRemote.init(rwSize, packetSize, manager);
 
@@ -371,7 +390,7 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
         String lang = buffer.getString();
         if (log.isDebugEnabled()) {
             log.debug("handleOpenFailure({}) reason={}, lang={}, msg={}",
-                      this, SshConstants.getOpenErrorCodeName(reason), lang, 
msg);
+                this, SshConstants.getOpenErrorCodeName(reason), lang, msg);
         }
 
         this.openFailureReason = reason;
@@ -389,7 +408,8 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
         if (isClosing()) {
             return;
         }
-        ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds 
int boundaries: %d", len);
+        ValidateUtils.checkTrue(
+            len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: 
%d", len);
 
         if (asyncOut != null) {
             asyncOut.write(new ByteArrayBuffer(data, off, (int) len));
@@ -412,7 +432,8 @@ public abstract class AbstractClientChannel extends 
AbstractChannel implements C
         if (isClosing()) {
             return;
         }
-        ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Extended data 
length exceeds int boundaries: %d", len);
+        ValidateUtils.checkTrue(
+            len <= Integer.MAX_VALUE, "Extended data length exceeds int 
boundaries: %d", len);
 
         if (asyncErr != null) {
             asyncErr.write(new ByteArrayBuffer(data, off, (int) len));
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
index 52b7849..55ffcb8 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
@@ -71,7 +71,7 @@ public class ChannelDirectTcpip extends AbstractClientChannel 
{
             throw new SshException("Session has been closed");
         }
 
-        openFuture = new DefaultOpenFuture(remote, lock);
+        openFuture = new DefaultOpenFuture(remote, futureLock);
         if (log.isDebugEnabled()) {
             log.debug("open({}) SSH_MSG_CHANNEL_OPEN", this);
         }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java
index 9c272aa..419b82f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java
@@ -25,10 +25,10 @@ import java.util.Map;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.util.logging.AbstractLoggingBean;
 
-/*
- * A ServerKeyVerifier that delegates verification to the ServerKeyVerifier 
found in the ClientSession metadata
- * The ServerKeyVerifier can be specified at the SshClient level, which may 
have connections to multiple hosts.
- * This technique lets each connection have its own ServerKeyVerifier.
+/**
+ * A {@link ServerKeyVerifier} that delegates verification to the instance 
found in the {@link ClientSession} metadata
+ * The verifier can be specified at the {@code SshClient} level, which may 
have connections to multiple hosts.
+ * This technique lets each connection have its own verifier instance.
  *
  * @author <a href="mailto:d...@mina.apache.org";>Apache MINA SSHD Project</a>
  */
@@ -38,16 +38,17 @@ public class DelegatingServerKeyVerifier extends 
AbstractLoggingBean implements
     }
 
     @Override
-    public boolean verifyServerKey(ClientSession sshClientSession, 
SocketAddress remoteAddress, PublicKey serverKey) {
-        Map<Object, Object> metadataMap = sshClientSession.getMetadataMap();
+    public boolean verifyServerKey(
+            ClientSession session, SocketAddress remoteAddress, PublicKey 
serverKey) {
+        Map<Object, Object> metadataMap = session.getMetadataMap();
         Object verifier = metadataMap.get(ServerKeyVerifier.class);
         if (verifier == null) {
             if (log.isTraceEnabled()) {
-                log.trace("verifyServerKey(" + remoteAddress + ") No verifier 
found in ClientSession metadata; accepting server key");
+                log.trace("verifyServerKey({}) No verifier found in 
ClientSession metadata; accepting server key", remoteAddress);
             }
             return true;
         }
         // We throw if it's not a ServerKeyVerifier...
-        return ((ServerKeyVerifier) 
verifier).verifyServerKey(sshClientSession, remoteAddress, serverKey);
+        return ((ServerKeyVerifier) verifier).verifyServerKey(session, 
remoteAddress, serverKey);
     }
 }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
index 950227c..5dccc3c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java
@@ -370,20 +370,28 @@ public interface ClientSession
     }
 
     /**
+     * @return A snapshot of the current session state
+     * @see #waitFor(Collection, long)
+     */
+    Set<ClientSessionEvent> getSessionState();
+
+    /**
      * Wait for any one of a specific state to be signaled.
      *
      * @param mask    The request {@link ClientSessionEvent}s mask
      * @param timeout Wait time in milliseconds - non-positive means forever
      * @return The actual state that was detected either due to the mask
-     * yielding one of the states or due to timeout (in which case the {@link 
ClientSessionEvent#TIMEOUT}
-     * value is set)
+     * yielding one of the states or due to timeout (in which case the
+     * {@link ClientSessionEvent#TIMEOUT} value is set)
      */
     Set<ClientSessionEvent> waitFor(Collection<ClientSessionEvent> mask, long 
timeout);
 
     /**
      * Access to the metadata.
      *
-     * @return The metadata {@link Map}
+     * @return The metadata {@link Map} - <B>Note:</B> access to the map
+     * is not {@code synchronized} in any way - up to the user to take care
+     * of mutual exclusion if necessary
      */
     Map<Object, Object> getMetadataMap();
 
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index bc9c6a2..73558ac 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.sshd.client.ClientFactoryManager;
 import org.apache.sshd.client.future.AuthFuture;
@@ -85,7 +86,7 @@ public class ClientSessionImpl extends AbstractClientSession {
             nextServiceFactory = null;
         }
 
-        authFuture = new DefaultAuthFuture(ioSession.getRemoteAddress(), lock);
+        authFuture = new DefaultAuthFuture(ioSession.getRemoteAddress(), 
futureLock);
         authFuture.setAuthed(false);
 
         signalSessionCreated(ioSession);
@@ -118,9 +119,11 @@ public class ClientSessionImpl extends 
AbstractClientSession {
         }
 
         ClientUserAuthService authService = getUserAuthService();
-        synchronized (lock) {
+        synchronized (sessionLock) {
             String serviceName = nextServiceName();
-            authFuture = 
ValidateUtils.checkNotNull(authService.auth(serviceName), "No auth future 
generated by service=%s", serviceName);
+            authFuture =
+                ValidateUtils.checkNotNull(
+                    authService.auth(serviceName), "No auth future generated 
by service=%s", serviceName);
             return authFuture;
         }
     }
@@ -145,7 +148,7 @@ public class ClientSessionImpl extends 
AbstractClientSession {
 
     protected void signalAuthFailure(AuthFuture future, Throwable t) {
         boolean signalled = false;
-        synchronized (lock) {
+        synchronized (sessionLock) {
             if ((future != null) && (!future.isDone())) {
                 future.setException(t);
                 signalled = true;
@@ -159,13 +162,13 @@ public class ClientSessionImpl extends 
AbstractClientSession {
     }
 
     protected String nextServiceName() {
-        synchronized (lock) {
+        synchronized (sessionLock) {
             return nextServiceFactory.getName();
         }
     }
 
     public void switchToNextService() throws IOException {
-        synchronized (lock) {
+        synchronized (sessionLock) {
             if (nextService == null) {
                 throw new IllegalStateException("No service available");
             }
@@ -182,8 +185,8 @@ public class ClientSessionImpl extends 
AbstractClientSession {
         if (SessionListener.Event.KeyEstablished.equals(event)) {
             sendInitialServiceRequest();
         }
-        synchronized (lock) {
-            lock.notifyAll();
+        synchronized (futureLock) {
+            futureLock.notifyAll();
         }
         super.signalSessionEvent(event);
     }
@@ -211,53 +214,49 @@ public class ClientSessionImpl extends 
AbstractClientSession {
     @Override
     public Set<ClientSessionEvent> waitFor(Collection<ClientSessionEvent> 
mask, long timeout) {
         Objects.requireNonNull(mask, "No mask specified");
-        long t = 0L;
         boolean traceEnabled = log.isTraceEnabled();
-        synchronized (lock) {
+        long startTime = System.currentTimeMillis();
+        /*
+         * NOTE: we need to use the futureLock since some of the events
+         * depend on auth/kex/close future(s)
+         */
+        synchronized (futureLock) {
+            long remWait = timeout;
             for (Set<ClientSessionEvent> cond = 
EnumSet.noneOf(ClientSessionEvent.class);; cond.clear()) {
-                if (closeFuture.isClosed()) {
-                    cond.add(ClientSessionEvent.CLOSED);
-                }
-                if (isAuthenticated()) { // authFuture.isSuccess()
-                    cond.add(ClientSessionEvent.AUTHED);
-                }
-                if (KexState.DONE.equals(kexState.get()) && 
authFuture.isFailure()) {
-                    cond.add(ClientSessionEvent.WAIT_AUTH);
-                }
+                updateCurrentSessionState(cond);
 
                 boolean nothingInCommon = Collections.disjoint(cond, mask);
                 if (!nothingInCommon) {
                     if (traceEnabled) {
-                        log.trace("waitFor(}{}) call return mask={}, cond={}", 
this, mask, cond);
+                        log.trace("waitFor({}) call return mask={}, cond={}", 
this, mask, cond);
                     }
                     return cond;
                 }
 
                 if (timeout > 0L) {
-                    if (t == 0L) {
-                        t = System.currentTimeMillis() + timeout;
-                    } else {
-                        timeout = t - System.currentTimeMillis();
-                        if (timeout <= 0L) {
-                            if (traceEnabled) {
-                                log.trace("WaitFor({}) call timeout mask={}", 
this, mask);
-                            }
-                            cond.add(ClientSessionEvent.TIMEOUT);
-                            return cond;
+                    long now = System.currentTimeMillis();
+                    long usedTime = now - startTime;
+                    if ((usedTime >= timeout) || (remWait <= 0L)) {
+                        if (traceEnabled) {
+                            log.trace("waitFor({}) call timeout {}/{} for 
mask={}: {}",
+                                this, usedTime, timeout, mask, cond);
                         }
+                        cond.add(ClientSessionEvent.TIMEOUT);
+                        return cond;
                     }
                 }
 
                 if (traceEnabled) {
-                    log.trace("waitFor({}) Waiting {} millis for lock on 
mask={}, cond={}", this, timeout, mask, cond);
+                    log.trace("waitFor({}) Waiting {} millis for lock on 
mask={}, cond={}",
+                        this, timeout, mask, cond);
                 }
 
                 long nanoStart = System.nanoTime();
                 try {
-                    if (timeout > 0) {
-                        lock.wait(timeout);
+                    if (timeout > 0L) {
+                        futureLock.wait(remWait);
                     } else {
-                        lock.wait();
+                        futureLock.wait();
                     }
 
                     long nanoEnd = System.nanoTime();
@@ -265,6 +264,15 @@ public class ClientSessionImpl extends 
AbstractClientSession {
                     if (traceEnabled) {
                         log.trace("waitFor({}) Lock notified after {} nanos", 
this, nanoDuration);
                     }
+
+                    if (timeout > 0L) {
+                        long waitDuration =
+                            TimeUnit.MILLISECONDS.convert(nanoDuration, 
TimeUnit.NANOSECONDS);
+                        if (waitDuration <= 0L) {
+                            waitDuration = 123L;
+                        }
+                        remWait -= waitDuration;
+                    }
                 } catch (InterruptedException e) {
                     long nanoEnd = System.nanoTime();
                     long nanoDuration = nanoEnd - nanoStart;
@@ -277,6 +285,29 @@ public class ClientSessionImpl extends 
AbstractClientSession {
     }
 
     @Override
+    public Set<ClientSessionEvent> getSessionState() {
+        Set<ClientSessionEvent> state = 
EnumSet.noneOf(ClientSessionEvent.class);
+        synchronized (futureLock) {
+            return updateCurrentSessionState(state);
+        }
+    }
+
+    // NOTE: assumed to be called under lock
+    protected <C extends Collection<ClientSessionEvent>> C 
updateCurrentSessionState(C state) {
+        if (closeFuture.isClosed()) {
+            state.add(ClientSessionEvent.CLOSED);
+        }
+        if (isAuthenticated()) { // authFuture.isSuccess()
+            state.add(ClientSessionEvent.AUTHED);
+        }
+        if (KexState.DONE.equals(kexState.get()) && authFuture.isFailure()) {
+            state.add(ClientSessionEvent.WAIT_AUTH);
+        }
+
+        return state;
+    }
+
+    @Override
     public Map<Object, Object> getMetadataMap() {
         return metadataMap;
     }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
index e45fed7..a8cb5e9 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java
@@ -134,7 +134,7 @@ public class ClientUserAuthService
 
         ClientSession session = getClientSession();
         // check if any previous future in use
-        AuthFuture authFuture = new DefaultAuthFuture(service, 
clientSession.getLock());
+        AuthFuture authFuture = new DefaultAuthFuture(service, 
clientSession.getFutureLock());
         AuthFuture currentFuture = authFutureHolder.getAndSet(authFuture);
         boolean debugEnabled = log.isDebugEnabled();
         if (currentFuture != null) {
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index befc90a..fd0a001 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -127,7 +127,7 @@ public abstract class AbstractChannel
             Collection<? extends RequestHandler<Channel>> handlers,
             CloseableExecutorService executorService) {
         super(discriminator);
-        gracefulFuture = new DefaultCloseFuture(discriminator, lock);
+        gracefulFuture = new DefaultCloseFuture(discriminator, futureLock);
         localWindow = new Window(this, null, client, true);
         remoteWindow = new Window(this, null, client, false);
         channelListenerProxy = EventListenerUtils.proxyWrapper(
@@ -477,8 +477,8 @@ public abstract class AbstractChannel
                 log.debug("notifyStateChanged(" + this + ")[" + hint + "] 
channel state signalling failure details", e);
             }
         } finally {
-            synchronized (lock) {
-                lock.notifyAll();
+            synchronized (futureLock) {
+                futureLock.notifyAll();
             }
         }
     }
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index 37dd824..b99e5d3 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -135,7 +135,7 @@ public class TcpipClientChannel extends 
AbstractClientChannel implements Forward
         }
 
         // make sure the pending messages queue is 1st in line
-        openFuture = new DefaultOpenFuture(src, lock)
+        openFuture = new DefaultOpenFuture(src, futureLock)
             .addListener(getPendingMessagesQueue());
         if (log.isDebugEnabled()) {
             log.debug("open({}) send SSH_MSG_CHANNEL_OPEN", this);
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index 4b40ff1..77cf06c 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -105,7 +105,6 @@ public abstract class AbstractConnectionService
      */
     protected final AtomicInteger nextChannelId = new AtomicInteger(0);
     protected final AtomicLong heartbeatCount = new AtomicLong(0L);
-
     private ScheduledFuture<?> heartBeat;
 
     private final AtomicReference<AgentForwardSupport> agentForwardHolder = 
new AtomicReference<>();
@@ -417,7 +416,7 @@ public abstract class AbstractConnectionService
         channel.init(this, session, channelId);
 
         boolean registered = false;
-        synchronized (lock) {
+        synchronized (channels) {
             if (!isClosing()) {
                 channels.put(channelId, channel);
                 registered = true;
@@ -451,7 +450,11 @@ public abstract class AbstractConnectionService
     @Override
     public void unregisterChannel(Channel channel) {
         int channelId = channel.getId();
-        Channel result = channels.remove(channelId);
+        Channel result;
+        synchronized (channels) {
+            result = channels.remove(channelId);
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("unregisterChannel({}) result={}", channel, result);
         }
@@ -565,10 +568,20 @@ public abstract class AbstractConnectionService
         }
 
         int id = channel.getId();
-        if (log.isDebugEnabled()) {
+        boolean debugEnabled = log.isDebugEnabled();
+        if (debugEnabled) {
             log.debug("channelOpenFailure({}) Received 
SSH_MSG_CHANNEL_OPEN_FAILURE", channel);
         }
-        channels.remove(id);
+
+        Channel removed;
+        synchronized (channels) {
+            removed = channels.remove(id);
+        }
+
+        if (debugEnabled) {
+            log.debug("channelOpenFailure({}) unregistered {}", channel, 
removed);
+        }
+
         channel.handleOpenFailure(buffer);
     }
 
@@ -715,7 +728,6 @@ public abstract class AbstractConnectionService
             // Throw a special exception - SSHD-777
             throw new SshChannelNotFoundException(recipient,
                 "Received " + SshConstants.getCommandMessageName(cmd) + " on 
unknown channel " + recipient);
-
         }
 
         channel = handler.handleUnknownChannelCommand(this, cmd, recipient, 
buffer);
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index b137681..827c20f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -358,7 +358,7 @@ public abstract class AbstractSession extends SessionHelper 
{
      */
     protected void handleMessage(Buffer buffer) throws Exception {
         try {
-            synchronized (lock) {
+            synchronized (sessionLock) {
                 doHandleMessage(buffer);
             }
         } catch (Throwable e) {
@@ -658,7 +658,8 @@ public abstract class AbstractSession extends SessionHelper 
{
     protected void handleNewKeys(int cmd, Buffer buffer) throws Exception {
         boolean debugEnabled = log.isDebugEnabled();
         if (debugEnabled) {
-            log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}", this, 
SshConstants.getCommandMessageName(cmd));
+            log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}",
+                this, SshConstants.getCommandMessageName(cmd));
         }
         validateKexState(cmd, KexState.KEYS);
         receiveNewKeys();
@@ -697,8 +698,8 @@ public abstract class AbstractSession extends SessionHelper 
{
             }
         }
 
-        synchronized (lock) {
-            lock.notifyAll();
+        synchronized (futureLock) {
+            futureLock.notifyAll();
         }
     }
 
@@ -1809,7 +1810,8 @@ public abstract class AbstractSession extends 
SessionHelper {
                     + " to generate keys for exchange: " + e.getMessage()), e);
         }
 
-        return ValidateUtils.checkNotNull(kexFutureHolder.get(), "No current 
KEX future on state=%s", kexState.get());
+        return ValidateUtils.checkNotNull(
+            kexFutureHolder.get(), "No current KEX future on state=%s", 
kexState.get());
     }
 
     /**
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
index 86f479b..3ae0e45 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
@@ -76,6 +76,9 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
  * Contains split code in order to make {@link AbstractSession} class smaller
  */
 public abstract class SessionHelper extends AbstractKexFactoryManager 
implements Session {
+    /** Session level lock for regulating access to sensitive data */
+    protected final Object sessionLock = new Object();
+
     /**
      * Client or server side
      */
@@ -231,7 +234,8 @@ public abstract class SessionHelper extends 
AbstractKexFactoryManager implements
         }
 
         long now = System.currentTimeMillis();
-        Map.Entry<TimeoutStatus, String> result = 
checkAuthenticationTimeout(now, getAuthTimeout());
+        Map.Entry<TimeoutStatus, String> result =
+            checkAuthenticationTimeout(now, getAuthTimeout());
         if (result == null) {
             result = checkIdleTimeout(now, getIdleTimeout());
         }
@@ -337,10 +341,6 @@ public abstract class SessionHelper extends 
AbstractKexFactoryManager implements
         return timeoutStatus.get();
     }
 
-    public Object getLock() {
-        return lock;
-    }
-
     @Override
     public ReservedSessionMessagesHandler getReservedSessionMessagesHandler() {
         return resolveEffectiveProvider(ReservedSessionMessagesHandler.class,
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java 
b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 906d0a5..cf04394 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -113,7 +113,7 @@ public class ChannelSession extends AbstractServerChannel {
     public ChannelSession(Collection<? extends RequestHandler<Channel>> 
handlers) {
         super("", handlers, null);
 
-        commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), 
lock);
+        commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), 
futureLock);
     }
 
     @Override
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java 
b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
index 8865854..a7abc9f 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java
@@ -53,7 +53,7 @@ public class ChannelForwardedX11 extends 
AbstractClientChannel {
         if (closeFuture.isClosed()) {
             throw new SshException("Session has been closed");
         }
-        openFuture = new DefaultOpenFuture(remote, lock);
+        openFuture = new DefaultOpenFuture(remote, futureLock);
 
         Session session = getSession();
         if (log.isDebugEnabled()) {
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java 
b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
index b2a164e..efba1ae 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
@@ -53,8 +53,8 @@ import org.junit.runners.MethodSorters;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class KeepAliveTest extends BaseTestSupport {
     private static final long HEARTBEAT = TimeUnit.SECONDS.toMillis(2L);
-    private static final long TIMEOUT = 2L * HEARTBEAT;
-    private static final long WAIT = 2L * TIMEOUT;
+    private static final long TIMEOUT = HEARTBEAT * 2L;
+    private static final long WAIT = 3L * TIMEOUT;
 
     private static SshServer sshd;
     private static int port;
@@ -101,20 +101,27 @@ public class KeepAliveTest extends BaseTestSupport {
 
     @After
     public void tearDown() {
-        PropertyResolverUtils.updateProperty(sshd, 
FactoryManager.IDLE_TIMEOUT, FactoryManager.DEFAULT_IDLE_TIMEOUT);
-        PropertyResolverUtils.updateProperty(client, 
ClientFactoryManager.HEARTBEAT_INTERVAL, 
ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
+        PropertyResolverUtils.updateProperty(
+            sshd, FactoryManager.IDLE_TIMEOUT, 
FactoryManager.DEFAULT_IDLE_TIMEOUT);
+        PropertyResolverUtils.updateProperty(
+            client, ClientFactoryManager.HEARTBEAT_INTERVAL, 
ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
     }
 
     @Test
     public void testIdleClient() throws Exception {
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+                .verify(7L, TimeUnit.SECONDS)
+                .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
 
             try (ClientChannel channel = 
session.createChannel(Channel.CHANNEL_SHELL)) {
+                long waitStart = System.currentTimeMillis();
                 Collection<ClientChannelEvent> result =
-                        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
-                assertTrue("Wrong channel state: " + result, 
result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED)));
+                    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
+                long waitEnd = System.currentTimeMillis();
+                assertTrue("Wrong channel state after wait of " + (waitEnd - 
waitStart) + " ms: " + result,
+                    result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED)));
             }
         }
     }
@@ -122,14 +129,19 @@ public class KeepAliveTest extends BaseTestSupport {
     @Test
     public void testClientWithHeartBeat() throws Exception {
         PropertyResolverUtils.updateProperty(client, 
ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT);
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+                .verify(7L, TimeUnit.SECONDS)
+                .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
 
             try (ClientChannel channel = 
session.createChannel(Channel.CHANNEL_SHELL)) {
+                long waitStart = System.currentTimeMillis();
                 Collection<ClientChannelEvent> result =
-                        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
-                assertTrue("Wrong channel state: " + result, 
result.contains(ClientChannelEvent.TIMEOUT));
+                    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
+                long waitEnd = System.currentTimeMillis();
+                assertTrue("Wrong channel state after wait of " + (waitEnd - 
waitStart) + " ms: " + result,
+                    result.contains(ClientChannelEvent.TIMEOUT));
             }
         }
     }
@@ -138,7 +150,9 @@ public class KeepAliveTest extends BaseTestSupport {
     public void testShellClosedOnClientTimeout() throws Exception {
         TestEchoShell.latch = new CountDownLatch(1);
 
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+                    .verify(7L, TimeUnit.SECONDS)
+                    .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
 
@@ -151,11 +165,13 @@ public class KeepAliveTest extends BaseTestSupport {
                 channel.open().verify(9L, TimeUnit.SECONDS);
 
                 assertTrue("Latch time out", TestEchoShell.latch.await(10L, 
TimeUnit.SECONDS));
+
+                long waitStart = System.currentTimeMillis();
                 Collection<ClientChannelEvent> result =
-                        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
-                assertTrue("Wrong channel state: " + result,
-                           result.containsAll(
-                               EnumSet.of(ClientChannelEvent.CLOSED, 
ClientChannelEvent.OPENED)));
+                    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
+                long waitEnd = System.currentTimeMillis();
+                assertTrue("Wrong channel state after wait of " + (waitEnd - 
waitStart) + " ms: " + result,
+                   result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED, 
ClientChannelEvent.OPENED)));
             }
         } finally {
             TestEchoShell.latch = null;
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java 
b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index d06f0d6..f77839d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -590,9 +590,12 @@ public class ClientTest extends BaseTestSupport {
                         }
                     });
 
+                long waitStart = System.currentTimeMillis();
                 Collection<ClientChannelEvent> result =
                     channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
TimeUnit.SECONDS.toMillis(15L));
-                assertFalse("Timeout while waiting for channel closure", 
result.contains(ClientChannelEvent.TIMEOUT));
+                long waitEnd = System.currentTimeMillis();
+                assertFalse("Timeout after " + (waitEnd - waitStart) + " ms. 
while waiting for channel closure",
+                    result.contains(ClientChannelEvent.TIMEOUT));
                 assertEquals("Mismatched sent and received data size", 
nbMessages * message.length, baosOut.size());
             }
 
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java 
b/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java
index a338713..fc1f102 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java
@@ -88,7 +88,9 @@ public class ChannelExecTest extends BaseTestSupport {
 
     @Test   // see SSHD-692
     public void testMultipleRemoteCommandExecutions() throws Exception {
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+                .verify(7L, TimeUnit.SECONDS)
+                .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
 
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
index f06fa1e..a061f76 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java
@@ -50,6 +50,7 @@ import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.util.test.BaseTestSupport;
 import org.apache.sshd.util.test.NoIoTestCase;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
@@ -75,6 +76,7 @@ public class AbstractSessionTest extends BaseTestSupport {
         session = new MySession();
     }
 
+    @After
     public void tearDown() throws Exception {
         if (session != null) {
             session.close();
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
 
b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
index 9b4b963..ed619b5 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java
@@ -77,7 +77,7 @@ public class ClientUserAuthServiceOld extends 
AbstractCloseable implements Servi
             throw new IllegalStateException("Client side service used on 
server side");
         }
         session = (ClientSessionImpl) s;
-        lock = session.getLock();
+        lock = session.getFutureLock();
         // Maintain the current auth status in the authFuture.
         authFuture = new DefaultAuthFuture(s.toString(), lock);
     }
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java 
b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
index fb3b0fb..c9ee121 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
@@ -199,7 +199,7 @@ public class ServerTest extends BaseTestSupport {
 
     @Test
     public void testAuthenticationTimeout() throws Exception {
-        final long testAuthTimeout = TimeUnit.SECONDS.toMillis(5L);
+        final long testAuthTimeout = TimeUnit.SECONDS.toMillis(4L);
         PropertyResolverUtils.updateProperty(sshd, 
FactoryManager.AUTH_TIMEOUT, testAuthTimeout);
 
         AtomicReference<TimeoutStatus> timeoutHolder = new AtomicReference<>();
@@ -222,17 +222,19 @@ public class ServerTest extends BaseTestSupport {
         });
         sshd.start();
         client.start();
-        Collection<ClientSession.ClientSessionEvent> res;
         try (ClientSession s = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, sshd.getPort())
                 .verify(7L, TimeUnit.SECONDS)
                 .getSession()) {
-            res = 
s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 2L * 
testAuthTimeout);
+            long waitStart = System.currentTimeMillis();
+            Collection<ClientSession.ClientSessionEvent> res =
+                s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 
3L * testAuthTimeout);
+            long waitEnd = System.currentTimeMillis();
+            assertTrue("Invalid session state after " + (waitEnd - waitStart) 
+ " ms: " + res,
+                
res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH)));
         } finally {
             client.stop();
         }
 
-        assertTrue("Session should be closed: " + res,
-            
res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED, 
ClientSession.ClientSessionEvent.WAIT_AUTH)));
         assertSame("Mismatched timeout status reported", 
TimeoutStatus.AuthTimeout, timeoutHolder.getAndSet(null));
     }
 
@@ -301,7 +303,6 @@ public class ServerTest extends BaseTestSupport {
         sshd.start();
 
         client.start();
-        Collection<ClientSession.ClientSessionEvent> res;
         try (ClientSession s = createTestClientSession(sshd);
              ChannelShell shell = s.createShellChannel();
              ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -313,13 +314,16 @@ public class ServerTest extends BaseTestSupport {
             assertTrue("No changes in activated channels", 
channelListener.waitForActiveChannelsChange(5L, TimeUnit.SECONDS));
             assertTrue("No changes in open channels", 
channelListener.waitForOpenChannelsChange(5L, TimeUnit.SECONDS));
 
-            res = 
s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 2L * 
testIdleTimeout);
+            long waitStart = System.currentTimeMillis();
+            Collection<ClientSession.ClientSessionEvent> res =
+                s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 
3L * testIdleTimeout);
+            long waitEnd = System.currentTimeMillis();
+            assertTrue("Invalid session state after " + (waitEnd - waitStart) 
+ " ms: " + res,
+                
res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED, 
ClientSession.ClientSessionEvent.AUTHED)));
         } finally {
             client.stop();
         }
 
-        assertTrue("Session should be closed and authenticated: " + res,
-            
res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED, 
ClientSession.ClientSessionEvent.AUTHED)));
         assertTrue("Session latch not signalled in time", latch.await(1L, 
TimeUnit.SECONDS));
         assertTrue("Shell latch not signalled in time", 
TestEchoShell.latch.await(1L, TimeUnit.SECONDS));
         assertSame("Mismatched timeout status", TimeoutStatus.IdleTimeout, 
timeoutHolder.getAndSet(null));
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
 
b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
index 13dc910..ae15362 100644
--- 
a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
+++ 
b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java
@@ -19,6 +19,7 @@
 package org.apache.sshd.server.channel;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -71,26 +72,33 @@ public class ChannelSessionTest extends BaseTestSupport {
             server.start();
             client.start();
 
-            try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, server.getPort()).verify(7L, TimeUnit.SECONDS).getSession()) {
+            try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, server.getPort())
+                    .verify(7L, TimeUnit.SECONDS)
+                    .getSession()) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);
 
                 try (ClientChannel channel = 
session.createChannel(Channel.CHANNEL_SHELL)) {
+                    channel.open().verify(7L, TimeUnit.SECONDS);
 
-                    channel.open().await();
-
-                    channel.getInvertedIn().write("echo 
foo\nexit\n".getBytes());
-                    channel.getInvertedIn().flush();
+                    OutputStream invertedIn = channel.getInvertedIn();
+                    String cmdSent = "echo foo\nexit\n";
+                    invertedIn.write(cmdSent.getBytes());
+                    invertedIn.flush();
 
+                    long waitStart = System.currentTimeMillis();
                     Collection<ClientChannelEvent> result =
-                            
channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 5000);
-                    assertTrue("Wrong channel state: " + result, 
result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED)));
+                        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
TimeUnit.SECONDS.toMillis(11L));
+                    long waitEnd = System.currentTimeMillis();
+                    assertTrue("Wrong channel state after " + (waitEnd - 
waitStart) + " ms.: " + result,
+                        
result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED)));
 
                     byte[] b = new byte[1024];
-                    int l = channel.getInvertedOut().read(b);
-                    String s = l > 0 ? new String(b, 0, l) : "";
+                    InputStream invertedOut = channel.getInvertedOut();
+                    int l = invertedOut.read(b);
+                    String cmdReceived = (l > 0) ? new String(b, 0, l) : "";
 
-                    assertEquals("echo foo\nexit\n", s);
+                    assertEquals("Mismatched echoed command", cmdSent, 
cmdReceived);
                 }
             }
         }
@@ -111,13 +119,14 @@ public class ChannelSessionTest extends BaseTestSupport {
                 }
         }) {
             AtomicBoolean expanded = new AtomicBoolean(false);
-            channelSession.asyncOut = new ChannelAsyncOutputStream(new 
BogusChannel(), (byte) 0) {
-                @Override
-                public void onWindowExpanded() throws IOException {
-                    expanded.set(true);
-                    super.onWindowExpanded();
-                }
-            };
+            channelSession.asyncOut =
+                new ChannelAsyncOutputStream(new BogusChannel(), (byte) 0) {
+                    @Override
+                    public void onWindowExpanded() throws IOException {
+                        expanded.set(true);
+                        super.onWindowExpanded();
+                    }
+                };
             channelSession.handleWindowAdjust(buffer);
             assertTrue("Expanded ?", expanded.get());
         }
diff --git 
a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java 
b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
index afab75d..8700e7a 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java
@@ -50,7 +50,7 @@ public class BogusChannel extends AbstractChannel {
 
     @Override
     public OpenFuture open(int recipient, long rwsize, long rmpsize, Buffer 
buffer) {
-        return new DefaultOpenFuture(this, this.lock);
+        return new DefaultOpenFuture(this, this.futureLock);
     }
 
     @Override
diff --git 
a/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java 
b/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
index 449a0b0..1b231ff 100644
--- a/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
+++ b/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
@@ -107,7 +107,7 @@ public class MinaSession extends AbstractInnerCloseable 
implements IoSession {
         return new IoBaseCloseable() {
             @SuppressWarnings("synthetic-access")
             private final DefaultCloseFuture future =
-                new DefaultCloseFuture(MinaSession.this.toString(), lock);
+                new DefaultCloseFuture(MinaSession.this.toString(), 
futureLock);
 
             @SuppressWarnings("synthetic-access")
             @Override
diff --git 
a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java 
b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java
index 96351f6..98d78ed 100644
--- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java
+++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.io.IoServiceEventListener;
@@ -59,7 +58,6 @@ import io.netty.util.concurrent.GlobalEventExecutor;
  */
 public class NettyIoAcceptor extends NettyIoService implements IoAcceptor {
     protected final ServerBootstrap bootstrap = new ServerBootstrap();
-    protected final DefaultCloseFuture closeFuture = new 
DefaultCloseFuture(toString(), lock);
     protected final Map<SocketAddress, Channel> boundAddresses = new 
ConcurrentHashMap<>();
 
     public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) {
diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java 
b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
index 64db5d8..580765d 100644
--- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
+++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java
@@ -22,9 +22,9 @@ package org.apache.sshd.netty;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoConnectFuture;
 import org.apache.sshd.common.io.IoHandler;
@@ -54,7 +54,6 @@ public class NettyIoSession extends AbstractCloseable 
implements IoSession {
     protected final Map<Object, Object> attributes = new HashMap<>();
     protected final NettyIoService service;
     protected final IoHandler handler;
-    protected final DefaultCloseFuture closeFuture = new 
DefaultCloseFuture(toString(), lock);
     protected final long id;
     protected ChannelHandlerContext context;
     protected SocketAddress remoteAddr;
@@ -64,6 +63,8 @@ public class NettyIoSession extends AbstractCloseable 
implements IoSession {
     private final SocketAddress acceptanceAddress;
 
     public NettyIoSession(NettyIoService service, IoHandler handler, 
SocketAddress acceptanceAddress) {
+        super(Objects.toString(acceptanceAddress, ""));
+
         this.service = service;
         this.handler = handler;
         this.id = service.sessionSeq.incrementAndGet();

Reply via email to