This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new ba8095e [SSHD-912] Use separate locks for Future(s) and Session/Channel instances ba8095e is described below commit ba8095edaa2dc190661f4c1d4b9a6260bce2d51b Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Tue Apr 30 08:59:39 2019 +0300 [SSHD-912] Use separate locks for Future(s) and Session/Channel instances --- CHANGES.md | 4 +- .../common/util/closeable/AbstractCloseable.java | 12 ++- .../sshd/client/channel/AbstractClientChannel.java | 75 ++++++++++------ .../sshd/client/channel/ChannelDirectTcpip.java | 2 +- .../keyverifier/DelegatingServerKeyVerifier.java | 17 ++-- .../apache/sshd/client/session/ClientSession.java | 14 ++- .../sshd/client/session/ClientSessionImpl.java | 99 ++++++++++++++-------- .../sshd/client/session/ClientUserAuthService.java | 2 +- .../sshd/common/channel/AbstractChannel.java | 6 +- .../sshd/common/forward/TcpipClientChannel.java | 2 +- .../session/helpers/AbstractConnectionService.java | 24 ++++-- .../common/session/helpers/AbstractSession.java | 12 +-- .../sshd/common/session/helpers/SessionHelper.java | 10 +-- .../apache/sshd/server/channel/ChannelSession.java | 2 +- .../sshd/server/x11/ChannelForwardedX11.java | 2 +- .../test/java/org/apache/sshd/KeepAliveTest.java | 46 ++++++---- .../java/org/apache/sshd/client/ClientTest.java | 5 +- .../sshd/client/channel/ChannelExecTest.java | 4 +- .../session/helpers/AbstractSessionTest.java | 2 + .../sshd/deprecated/ClientUserAuthServiceOld.java | 2 +- .../java/org/apache/sshd/server/ServerTest.java | 22 +++-- .../sshd/server/channel/ChannelSessionTest.java | 43 ++++++---- .../org/apache/sshd/util/test/BogusChannel.java | 2 +- .../apache/sshd/common/io/mina/MinaSession.java | 2 +- .../org/apache/sshd/netty/NettyIoAcceptor.java | 2 - .../java/org/apache/sshd/netty/NettyIoSession.java | 5 +- 26 files changed, 267 insertions(+), 151 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 273852b..182c736 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -126,4 +126,6 @@ and therefore closing all currently tracked file/directory handles. * [SSHD-909](https://issues.apache.org/jira/browse/SSHD-909) - SFTP versions extension handler ignores non-numerical versions when resolving the available ones. -* [SSHD-913](https://issues.apache.org/jira/browse/SSHD-913) - Provide channel session instance to command and/or shell factories creators \ No newline at end of file +* [SSHD-913](https://issues.apache.org/jira/browse/SSHD-913) - Provide channel session instance to command and/or shell factories creators + +* [SSHD-912](https://issues.apache.org/jira/browse/SSHD-912) - Use separate locks for Future(s) and Session/Channel instances. diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java index 6413ebb..e503431 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java @@ -37,9 +37,9 @@ public abstract class AbstractCloseable extends IoBaseCloseable { } /** - * Lock object for this session state + * Lock object for {@code Future}-s based on this closeable instance */ - protected final Object lock = new Object(); + protected final Object futureLock = new Object(); /** * State of this object @@ -57,7 +57,11 @@ public abstract class AbstractCloseable extends IoBaseCloseable { protected AbstractCloseable(String discriminator) { super(discriminator); - closeFuture = new DefaultCloseFuture(discriminator, lock); + closeFuture = new DefaultCloseFuture(discriminator, futureLock); + } + + public Object getFutureLock() { + return futureLock; } @Override @@ -157,6 +161,6 @@ public abstract class AbstractCloseable extends IoBaseCloseable { } protected Builder builder() { - return new Builder(lock); + return new Builder(futureLock); } } \ No newline at end of file diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java index 2673031..611e789 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -211,10 +212,15 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C @Override public Set<ClientChannelEvent> waitFor(Collection<ClientChannelEvent> mask, long timeout) { Objects.requireNonNull(mask, "No mask specified"); - long t = 0; boolean debugEnabled = log.isDebugEnabled(); boolean traceEnabled = log.isTraceEnabled(); - synchronized (lock) { + long startTime = System.currentTimeMillis(); + /* + * NOTE !!! we must use the futureLock since some of the events that + * we wait on are related to open/close future(s) + */ + synchronized (futureLock) { + long remWait = timeout; for (Set<ClientChannelEvent> cond = EnumSet.noneOf(ClientChannelEvent.class);; cond.clear()) { updateCurrentChannelState(cond); if (debugEnabled) { @@ -229,48 +235,57 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C boolean nothingInCommon = Collections.disjoint(mask, cond); if (!nothingInCommon) { if (traceEnabled) { - log.trace("WaitFor call returning on channel {}, mask={}, cond={}", this, mask, cond); + log.trace("waitFor({}) call returning mask={}, cond={}", this, mask, cond); } return cond; } if (timeout > 0L) { - if (t == 0L) { - t = System.currentTimeMillis() + timeout; - } else { - timeout = t - System.currentTimeMillis(); - if (timeout <= 0L) { - if (traceEnabled) { - log.trace("WaitFor call timeout on channel {}, mask={}", this, mask); - } - cond.add(ClientChannelEvent.TIMEOUT); - return cond; + long now = System.currentTimeMillis(); + long usedTime = now - startTime; + if ((usedTime >= timeout) || (remWait <= 0L)) { + if (traceEnabled) { + log.trace("waitFor({}) call timeout {}/{} for mask={}: {}", + this, usedTime, timeout, mask, cond); } + cond.add(ClientChannelEvent.TIMEOUT); + return cond; } } if (traceEnabled) { - log.trace("Waiting {} millis for lock on channel {}, mask={}, cond={}", timeout, this, mask, cond); + log.trace("waitFor({}) waiting {} millis for lock - mask={}, cond={}", + this, remWait, mask, cond); } long nanoStart = System.nanoTime(); try { if (timeout > 0L) { - lock.wait(timeout); + futureLock.wait(remWait); } else { - lock.wait(); + futureLock.wait(); } long nanoEnd = System.nanoTime(); long nanoDuration = nanoEnd - nanoStart; if (traceEnabled) { - log.trace("Lock notified on channel {} after {} nanos", this, nanoDuration); + log.trace("waitFor({}) lock notified on channel after {} nanos", this, nanoDuration); + } + + if (timeout > 0L) { + long waitDuration = + TimeUnit.MILLISECONDS.convert(nanoDuration, TimeUnit.NANOSECONDS); + if (waitDuration <= 0L) { + waitDuration = 123L; + } + remWait -= waitDuration; } } catch (InterruptedException e) { long nanoEnd = System.nanoTime(); long nanoDuration = nanoEnd - nanoStart; if (traceEnabled) { - log.trace("waitFor({}) mask={} - ignoring interrupted exception after {} nanos", this, mask, nanoDuration); + log.trace("waitFor({}) mask={} - ignoring interrupted exception after {} nanos", + this, mask, nanoDuration); } } } @@ -279,8 +294,9 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C @Override public Set<ClientChannelEvent> getChannelState() { - Set<ClientChannelEvent> cond = EnumSet.noneOf(ClientChannelEvent.class); - synchronized (lock) { + Set<ClientChannelEvent> cond = + EnumSet.noneOf(ClientChannelEvent.class); + synchronized (futureLock) { return updateCurrentChannelState(cond); } } @@ -312,7 +328,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C throw new SshException("Session has been closed"); } - openFuture = new DefaultOpenFuture(this.toString(), lock); + openFuture = new DefaultOpenFuture(this.toString(), futureLock); String type = getChannelType(); if (log.isDebugEnabled()) { log.debug("open({}) Send SSH_MSG_CHANNEL_OPEN - type={}", this, type); @@ -320,7 +336,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C Session session = getSession(); Window wLocal = getLocalWindow(); - Buffer buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, type.length() + Integer.SIZE); + Buffer buffer = + session.createBuffer(SshConstants.SSH_MSG_CHANNEL_OPEN, type.length() + Integer.SIZE); buffer.putString(type); buffer.putInt(getId()); buffer.putInt(wLocal.getSize()); @@ -331,7 +348,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C @Override public OpenFuture open(int recipient, long rwSize, long packetSize, Buffer buffer) { - throw new UnsupportedOperationException("open(" + recipient + "," + rwSize + "," + packetSize + ") N/A"); + throw new UnsupportedOperationException( + "open(" + recipient + "," + rwSize + "," + packetSize + ") N/A"); } @Override @@ -339,7 +357,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C setRecipient(recipient); Session session = getSession(); - FactoryManager manager = Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); + FactoryManager manager = + Objects.requireNonNull(session.getFactoryManager(), "No factory manager"); Window wRemote = getRemoteWindow(); wRemote.init(rwSize, packetSize, manager); @@ -371,7 +390,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C String lang = buffer.getString(); if (log.isDebugEnabled()) { log.debug("handleOpenFailure({}) reason={}, lang={}, msg={}", - this, SshConstants.getOpenErrorCodeName(reason), lang, msg); + this, SshConstants.getOpenErrorCodeName(reason), lang, msg); } this.openFailureReason = reason; @@ -389,7 +408,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C if (isClosing()) { return; } - ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len); + ValidateUtils.checkTrue( + len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len); if (asyncOut != null) { asyncOut.write(new ByteArrayBuffer(data, off, (int) len)); @@ -412,7 +432,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C if (isClosing()) { return; } - ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Extended data length exceeds int boundaries: %d", len); + ValidateUtils.checkTrue( + len <= Integer.MAX_VALUE, "Extended data length exceeds int boundaries: %d", len); if (asyncErr != null) { asyncErr.write(new ByteArrayBuffer(data, off, (int) len)); diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java index 52b7849..55ffcb8 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java @@ -71,7 +71,7 @@ public class ChannelDirectTcpip extends AbstractClientChannel { throw new SshException("Session has been closed"); } - openFuture = new DefaultOpenFuture(remote, lock); + openFuture = new DefaultOpenFuture(remote, futureLock); if (log.isDebugEnabled()) { log.debug("open({}) SSH_MSG_CHANNEL_OPEN", this); } diff --git a/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java b/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java index 9c272aa..419b82f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/keyverifier/DelegatingServerKeyVerifier.java @@ -25,10 +25,10 @@ import java.util.Map; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.util.logging.AbstractLoggingBean; -/* - * A ServerKeyVerifier that delegates verification to the ServerKeyVerifier found in the ClientSession metadata - * The ServerKeyVerifier can be specified at the SshClient level, which may have connections to multiple hosts. - * This technique lets each connection have its own ServerKeyVerifier. +/** + * A {@link ServerKeyVerifier} that delegates verification to the instance found in the {@link ClientSession} metadata + * The verifier can be specified at the {@code SshClient} level, which may have connections to multiple hosts. + * This technique lets each connection have its own verifier instance. * * @author <a href="mailto:d...@mina.apache.org">Apache MINA SSHD Project</a> */ @@ -38,16 +38,17 @@ public class DelegatingServerKeyVerifier extends AbstractLoggingBean implements } @Override - public boolean verifyServerKey(ClientSession sshClientSession, SocketAddress remoteAddress, PublicKey serverKey) { - Map<Object, Object> metadataMap = sshClientSession.getMetadataMap(); + public boolean verifyServerKey( + ClientSession session, SocketAddress remoteAddress, PublicKey serverKey) { + Map<Object, Object> metadataMap = session.getMetadataMap(); Object verifier = metadataMap.get(ServerKeyVerifier.class); if (verifier == null) { if (log.isTraceEnabled()) { - log.trace("verifyServerKey(" + remoteAddress + ") No verifier found in ClientSession metadata; accepting server key"); + log.trace("verifyServerKey({}) No verifier found in ClientSession metadata; accepting server key", remoteAddress); } return true; } // We throw if it's not a ServerKeyVerifier... - return ((ServerKeyVerifier) verifier).verifyServerKey(sshClientSession, remoteAddress, serverKey); + return ((ServerKeyVerifier) verifier).verifyServerKey(session, remoteAddress, serverKey); } } diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java index 950227c..5dccc3c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSession.java @@ -370,20 +370,28 @@ public interface ClientSession } /** + * @return A snapshot of the current session state + * @see #waitFor(Collection, long) + */ + Set<ClientSessionEvent> getSessionState(); + + /** * Wait for any one of a specific state to be signaled. * * @param mask The request {@link ClientSessionEvent}s mask * @param timeout Wait time in milliseconds - non-positive means forever * @return The actual state that was detected either due to the mask - * yielding one of the states or due to timeout (in which case the {@link ClientSessionEvent#TIMEOUT} - * value is set) + * yielding one of the states or due to timeout (in which case the + * {@link ClientSessionEvent#TIMEOUT} value is set) */ Set<ClientSessionEvent> waitFor(Collection<ClientSessionEvent> mask, long timeout); /** * Access to the metadata. * - * @return The metadata {@link Map} + * @return The metadata {@link Map} - <B>Note:</B> access to the map + * is not {@code synchronized} in any way - up to the user to take care + * of mutual exclusion if necessary */ Map<Object, Object> getMetadataMap(); diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java index bc9c6a2..73558ac 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.sshd.client.ClientFactoryManager; import org.apache.sshd.client.future.AuthFuture; @@ -85,7 +86,7 @@ public class ClientSessionImpl extends AbstractClientSession { nextServiceFactory = null; } - authFuture = new DefaultAuthFuture(ioSession.getRemoteAddress(), lock); + authFuture = new DefaultAuthFuture(ioSession.getRemoteAddress(), futureLock); authFuture.setAuthed(false); signalSessionCreated(ioSession); @@ -118,9 +119,11 @@ public class ClientSessionImpl extends AbstractClientSession { } ClientUserAuthService authService = getUserAuthService(); - synchronized (lock) { + synchronized (sessionLock) { String serviceName = nextServiceName(); - authFuture = ValidateUtils.checkNotNull(authService.auth(serviceName), "No auth future generated by service=%s", serviceName); + authFuture = + ValidateUtils.checkNotNull( + authService.auth(serviceName), "No auth future generated by service=%s", serviceName); return authFuture; } } @@ -145,7 +148,7 @@ public class ClientSessionImpl extends AbstractClientSession { protected void signalAuthFailure(AuthFuture future, Throwable t) { boolean signalled = false; - synchronized (lock) { + synchronized (sessionLock) { if ((future != null) && (!future.isDone())) { future.setException(t); signalled = true; @@ -159,13 +162,13 @@ public class ClientSessionImpl extends AbstractClientSession { } protected String nextServiceName() { - synchronized (lock) { + synchronized (sessionLock) { return nextServiceFactory.getName(); } } public void switchToNextService() throws IOException { - synchronized (lock) { + synchronized (sessionLock) { if (nextService == null) { throw new IllegalStateException("No service available"); } @@ -182,8 +185,8 @@ public class ClientSessionImpl extends AbstractClientSession { if (SessionListener.Event.KeyEstablished.equals(event)) { sendInitialServiceRequest(); } - synchronized (lock) { - lock.notifyAll(); + synchronized (futureLock) { + futureLock.notifyAll(); } super.signalSessionEvent(event); } @@ -211,53 +214,49 @@ public class ClientSessionImpl extends AbstractClientSession { @Override public Set<ClientSessionEvent> waitFor(Collection<ClientSessionEvent> mask, long timeout) { Objects.requireNonNull(mask, "No mask specified"); - long t = 0L; boolean traceEnabled = log.isTraceEnabled(); - synchronized (lock) { + long startTime = System.currentTimeMillis(); + /* + * NOTE: we need to use the futureLock since some of the events + * depend on auth/kex/close future(s) + */ + synchronized (futureLock) { + long remWait = timeout; for (Set<ClientSessionEvent> cond = EnumSet.noneOf(ClientSessionEvent.class);; cond.clear()) { - if (closeFuture.isClosed()) { - cond.add(ClientSessionEvent.CLOSED); - } - if (isAuthenticated()) { // authFuture.isSuccess() - cond.add(ClientSessionEvent.AUTHED); - } - if (KexState.DONE.equals(kexState.get()) && authFuture.isFailure()) { - cond.add(ClientSessionEvent.WAIT_AUTH); - } + updateCurrentSessionState(cond); boolean nothingInCommon = Collections.disjoint(cond, mask); if (!nothingInCommon) { if (traceEnabled) { - log.trace("waitFor(}{}) call return mask={}, cond={}", this, mask, cond); + log.trace("waitFor({}) call return mask={}, cond={}", this, mask, cond); } return cond; } if (timeout > 0L) { - if (t == 0L) { - t = System.currentTimeMillis() + timeout; - } else { - timeout = t - System.currentTimeMillis(); - if (timeout <= 0L) { - if (traceEnabled) { - log.trace("WaitFor({}) call timeout mask={}", this, mask); - } - cond.add(ClientSessionEvent.TIMEOUT); - return cond; + long now = System.currentTimeMillis(); + long usedTime = now - startTime; + if ((usedTime >= timeout) || (remWait <= 0L)) { + if (traceEnabled) { + log.trace("waitFor({}) call timeout {}/{} for mask={}: {}", + this, usedTime, timeout, mask, cond); } + cond.add(ClientSessionEvent.TIMEOUT); + return cond; } } if (traceEnabled) { - log.trace("waitFor({}) Waiting {} millis for lock on mask={}, cond={}", this, timeout, mask, cond); + log.trace("waitFor({}) Waiting {} millis for lock on mask={}, cond={}", + this, timeout, mask, cond); } long nanoStart = System.nanoTime(); try { - if (timeout > 0) { - lock.wait(timeout); + if (timeout > 0L) { + futureLock.wait(remWait); } else { - lock.wait(); + futureLock.wait(); } long nanoEnd = System.nanoTime(); @@ -265,6 +264,15 @@ public class ClientSessionImpl extends AbstractClientSession { if (traceEnabled) { log.trace("waitFor({}) Lock notified after {} nanos", this, nanoDuration); } + + if (timeout > 0L) { + long waitDuration = + TimeUnit.MILLISECONDS.convert(nanoDuration, TimeUnit.NANOSECONDS); + if (waitDuration <= 0L) { + waitDuration = 123L; + } + remWait -= waitDuration; + } } catch (InterruptedException e) { long nanoEnd = System.nanoTime(); long nanoDuration = nanoEnd - nanoStart; @@ -277,6 +285,29 @@ public class ClientSessionImpl extends AbstractClientSession { } @Override + public Set<ClientSessionEvent> getSessionState() { + Set<ClientSessionEvent> state = EnumSet.noneOf(ClientSessionEvent.class); + synchronized (futureLock) { + return updateCurrentSessionState(state); + } + } + + // NOTE: assumed to be called under lock + protected <C extends Collection<ClientSessionEvent>> C updateCurrentSessionState(C state) { + if (closeFuture.isClosed()) { + state.add(ClientSessionEvent.CLOSED); + } + if (isAuthenticated()) { // authFuture.isSuccess() + state.add(ClientSessionEvent.AUTHED); + } + if (KexState.DONE.equals(kexState.get()) && authFuture.isFailure()) { + state.add(ClientSessionEvent.WAIT_AUTH); + } + + return state; + } + + @Override public Map<Object, Object> getMetadataMap() { return metadataMap; } diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java index e45fed7..a8cb5e9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientUserAuthService.java @@ -134,7 +134,7 @@ public class ClientUserAuthService ClientSession session = getClientSession(); // check if any previous future in use - AuthFuture authFuture = new DefaultAuthFuture(service, clientSession.getLock()); + AuthFuture authFuture = new DefaultAuthFuture(service, clientSession.getFutureLock()); AuthFuture currentFuture = authFutureHolder.getAndSet(authFuture); boolean debugEnabled = log.isDebugEnabled(); if (currentFuture != null) { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java index befc90a..fd0a001 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java @@ -127,7 +127,7 @@ public abstract class AbstractChannel Collection<? extends RequestHandler<Channel>> handlers, CloseableExecutorService executorService) { super(discriminator); - gracefulFuture = new DefaultCloseFuture(discriminator, lock); + gracefulFuture = new DefaultCloseFuture(discriminator, futureLock); localWindow = new Window(this, null, client, true); remoteWindow = new Window(this, null, client, false); channelListenerProxy = EventListenerUtils.proxyWrapper( @@ -477,8 +477,8 @@ public abstract class AbstractChannel log.debug("notifyStateChanged(" + this + ")[" + hint + "] channel state signalling failure details", e); } } finally { - synchronized (lock) { - lock.notifyAll(); + synchronized (futureLock) { + futureLock.notifyAll(); } } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java index 37dd824..b99e5d3 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java @@ -135,7 +135,7 @@ public class TcpipClientChannel extends AbstractClientChannel implements Forward } // make sure the pending messages queue is 1st in line - openFuture = new DefaultOpenFuture(src, lock) + openFuture = new DefaultOpenFuture(src, futureLock) .addListener(getPendingMessagesQueue()); if (log.isDebugEnabled()) { log.debug("open({}) send SSH_MSG_CHANNEL_OPEN", this); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java index 4b40ff1..77cf06c 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java @@ -105,7 +105,6 @@ public abstract class AbstractConnectionService */ protected final AtomicInteger nextChannelId = new AtomicInteger(0); protected final AtomicLong heartbeatCount = new AtomicLong(0L); - private ScheduledFuture<?> heartBeat; private final AtomicReference<AgentForwardSupport> agentForwardHolder = new AtomicReference<>(); @@ -417,7 +416,7 @@ public abstract class AbstractConnectionService channel.init(this, session, channelId); boolean registered = false; - synchronized (lock) { + synchronized (channels) { if (!isClosing()) { channels.put(channelId, channel); registered = true; @@ -451,7 +450,11 @@ public abstract class AbstractConnectionService @Override public void unregisterChannel(Channel channel) { int channelId = channel.getId(); - Channel result = channels.remove(channelId); + Channel result; + synchronized (channels) { + result = channels.remove(channelId); + } + if (log.isDebugEnabled()) { log.debug("unregisterChannel({}) result={}", channel, result); } @@ -565,10 +568,20 @@ public abstract class AbstractConnectionService } int id = channel.getId(); - if (log.isDebugEnabled()) { + boolean debugEnabled = log.isDebugEnabled(); + if (debugEnabled) { log.debug("channelOpenFailure({}) Received SSH_MSG_CHANNEL_OPEN_FAILURE", channel); } - channels.remove(id); + + Channel removed; + synchronized (channels) { + removed = channels.remove(id); + } + + if (debugEnabled) { + log.debug("channelOpenFailure({}) unregistered {}", channel, removed); + } + channel.handleOpenFailure(buffer); } @@ -715,7 +728,6 @@ public abstract class AbstractConnectionService // Throw a special exception - SSHD-777 throw new SshChannelNotFoundException(recipient, "Received " + SshConstants.getCommandMessageName(cmd) + " on unknown channel " + recipient); - } channel = handler.handleUnknownChannelCommand(this, cmd, recipient, buffer); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index b137681..827c20f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -358,7 +358,7 @@ public abstract class AbstractSession extends SessionHelper { */ protected void handleMessage(Buffer buffer) throws Exception { try { - synchronized (lock) { + synchronized (sessionLock) { doHandleMessage(buffer); } } catch (Throwable e) { @@ -658,7 +658,8 @@ public abstract class AbstractSession extends SessionHelper { protected void handleNewKeys(int cmd, Buffer buffer) throws Exception { boolean debugEnabled = log.isDebugEnabled(); if (debugEnabled) { - log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}", this, SshConstants.getCommandMessageName(cmd)); + log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}", + this, SshConstants.getCommandMessageName(cmd)); } validateKexState(cmd, KexState.KEYS); receiveNewKeys(); @@ -697,8 +698,8 @@ public abstract class AbstractSession extends SessionHelper { } } - synchronized (lock) { - lock.notifyAll(); + synchronized (futureLock) { + futureLock.notifyAll(); } } @@ -1809,7 +1810,8 @@ public abstract class AbstractSession extends SessionHelper { + " to generate keys for exchange: " + e.getMessage()), e); } - return ValidateUtils.checkNotNull(kexFutureHolder.get(), "No current KEX future on state=%s", kexState.get()); + return ValidateUtils.checkNotNull( + kexFutureHolder.get(), "No current KEX future on state=%s", kexState.get()); } /** diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java index 86f479b..3ae0e45 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java @@ -76,6 +76,9 @@ import org.apache.sshd.common.util.net.SshdSocketAddress; * Contains split code in order to make {@link AbstractSession} class smaller */ public abstract class SessionHelper extends AbstractKexFactoryManager implements Session { + /** Session level lock for regulating access to sensitive data */ + protected final Object sessionLock = new Object(); + /** * Client or server side */ @@ -231,7 +234,8 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements } long now = System.currentTimeMillis(); - Map.Entry<TimeoutStatus, String> result = checkAuthenticationTimeout(now, getAuthTimeout()); + Map.Entry<TimeoutStatus, String> result = + checkAuthenticationTimeout(now, getAuthTimeout()); if (result == null) { result = checkIdleTimeout(now, getIdleTimeout()); } @@ -337,10 +341,6 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements return timeoutStatus.get(); } - public Object getLock() { - return lock; - } - @Override public ReservedSessionMessagesHandler getReservedSessionMessagesHandler() { return resolveEffectiveProvider(ReservedSessionMessagesHandler.class, diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java index 906d0a5..cf04394 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java @@ -113,7 +113,7 @@ public class ChannelSession extends AbstractServerChannel { public ChannelSession(Collection<? extends RequestHandler<Channel>> handlers) { super("", handlers, null); - commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), lock); + commandExitFuture = new DefaultCloseFuture(getClass().getSimpleName(), futureLock); } @Override diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java index 8865854..a7abc9f 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/ChannelForwardedX11.java @@ -53,7 +53,7 @@ public class ChannelForwardedX11 extends AbstractClientChannel { if (closeFuture.isClosed()) { throw new SshException("Session has been closed"); } - openFuture = new DefaultOpenFuture(remote, lock); + openFuture = new DefaultOpenFuture(remote, futureLock); Session session = getSession(); if (log.isDebugEnabled()) { diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java index b2a164e..efba1ae 100644 --- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java @@ -53,8 +53,8 @@ import org.junit.runners.MethodSorters; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class KeepAliveTest extends BaseTestSupport { private static final long HEARTBEAT = TimeUnit.SECONDS.toMillis(2L); - private static final long TIMEOUT = 2L * HEARTBEAT; - private static final long WAIT = 2L * TIMEOUT; + private static final long TIMEOUT = HEARTBEAT * 2L; + private static final long WAIT = 3L * TIMEOUT; private static SshServer sshd; private static int port; @@ -101,20 +101,27 @@ public class KeepAliveTest extends BaseTestSupport { @After public void tearDown() { - PropertyResolverUtils.updateProperty(sshd, FactoryManager.IDLE_TIMEOUT, FactoryManager.DEFAULT_IDLE_TIMEOUT); - PropertyResolverUtils.updateProperty(client, ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL); + PropertyResolverUtils.updateProperty( + sshd, FactoryManager.IDLE_TIMEOUT, FactoryManager.DEFAULT_IDLE_TIMEOUT); + PropertyResolverUtils.updateProperty( + client, ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL); } @Test public void testIdleClient() throws Exception { - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) { + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); try (ClientChannel channel = session.createChannel(Channel.CHANNEL_SHELL)) { + long waitStart = System.currentTimeMillis(); Collection<ClientChannelEvent> result = - channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); - assertTrue("Wrong channel state: " + result, result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED))); + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); + long waitEnd = System.currentTimeMillis(); + assertTrue("Wrong channel state after wait of " + (waitEnd - waitStart) + " ms: " + result, + result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED))); } } } @@ -122,14 +129,19 @@ public class KeepAliveTest extends BaseTestSupport { @Test public void testClientWithHeartBeat() throws Exception { PropertyResolverUtils.updateProperty(client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT); - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) { + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); try (ClientChannel channel = session.createChannel(Channel.CHANNEL_SHELL)) { + long waitStart = System.currentTimeMillis(); Collection<ClientChannelEvent> result = - channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); - assertTrue("Wrong channel state: " + result, result.contains(ClientChannelEvent.TIMEOUT)); + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); + long waitEnd = System.currentTimeMillis(); + assertTrue("Wrong channel state after wait of " + (waitEnd - waitStart) + " ms: " + result, + result.contains(ClientChannelEvent.TIMEOUT)); } } } @@ -138,7 +150,9 @@ public class KeepAliveTest extends BaseTestSupport { public void testShellClosedOnClientTimeout() throws Exception { TestEchoShell.latch = new CountDownLatch(1); - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) { + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); @@ -151,11 +165,13 @@ public class KeepAliveTest extends BaseTestSupport { channel.open().verify(9L, TimeUnit.SECONDS); assertTrue("Latch time out", TestEchoShell.latch.await(10L, TimeUnit.SECONDS)); + + long waitStart = System.currentTimeMillis(); Collection<ClientChannelEvent> result = - channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); - assertTrue("Wrong channel state: " + result, - result.containsAll( - EnumSet.of(ClientChannelEvent.CLOSED, ClientChannelEvent.OPENED))); + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); + long waitEnd = System.currentTimeMillis(); + assertTrue("Wrong channel state after wait of " + (waitEnd - waitStart) + " ms: " + result, + result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED, ClientChannelEvent.OPENED))); } } finally { TestEchoShell.latch = null; diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java index d06f0d6..f77839d 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java @@ -590,9 +590,12 @@ public class ClientTest extends BaseTestSupport { } }); + long waitStart = System.currentTimeMillis(); Collection<ClientChannelEvent> result = channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), TimeUnit.SECONDS.toMillis(15L)); - assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannelEvent.TIMEOUT)); + long waitEnd = System.currentTimeMillis(); + assertFalse("Timeout after " + (waitEnd - waitStart) + " ms. while waiting for channel closure", + result.contains(ClientChannelEvent.TIMEOUT)); assertEquals("Mismatched sent and received data size", nbMessages * message.length, baosOut.size()); } diff --git a/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java b/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java index a338713..fc1f102 100644 --- a/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/client/channel/ChannelExecTest.java @@ -88,7 +88,9 @@ public class ChannelExecTest extends BaseTestSupport { @Test // see SSHD-692 public void testMultipleRemoteCommandExecutions() throws Exception { - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) { + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); diff --git a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java index f06fa1e..a061f76 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/session/helpers/AbstractSessionTest.java @@ -50,6 +50,7 @@ import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; import org.apache.sshd.util.test.BaseTestSupport; import org.apache.sshd.util.test.NoIoTestCase; +import org.junit.After; import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; @@ -75,6 +76,7 @@ public class AbstractSessionTest extends BaseTestSupport { session = new MySession(); } + @After public void tearDown() throws Exception { if (session != null) { session.close(); diff --git a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java index 9b4b963..ed619b5 100644 --- a/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java +++ b/sshd-core/src/test/java/org/apache/sshd/deprecated/ClientUserAuthServiceOld.java @@ -77,7 +77,7 @@ public class ClientUserAuthServiceOld extends AbstractCloseable implements Servi throw new IllegalStateException("Client side service used on server side"); } session = (ClientSessionImpl) s; - lock = session.getLock(); + lock = session.getFutureLock(); // Maintain the current auth status in the authFuture. authFuture = new DefaultAuthFuture(s.toString(), lock); } diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java index fb3b0fb..c9ee121 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java @@ -199,7 +199,7 @@ public class ServerTest extends BaseTestSupport { @Test public void testAuthenticationTimeout() throws Exception { - final long testAuthTimeout = TimeUnit.SECONDS.toMillis(5L); + final long testAuthTimeout = TimeUnit.SECONDS.toMillis(4L); PropertyResolverUtils.updateProperty(sshd, FactoryManager.AUTH_TIMEOUT, testAuthTimeout); AtomicReference<TimeoutStatus> timeoutHolder = new AtomicReference<>(); @@ -222,17 +222,19 @@ public class ServerTest extends BaseTestSupport { }); sshd.start(); client.start(); - Collection<ClientSession.ClientSessionEvent> res; try (ClientSession s = client.connect(getCurrentTestName(), TEST_LOCALHOST, sshd.getPort()) .verify(7L, TimeUnit.SECONDS) .getSession()) { - res = s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 2L * testAuthTimeout); + long waitStart = System.currentTimeMillis(); + Collection<ClientSession.ClientSessionEvent> res = + s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 3L * testAuthTimeout); + long waitEnd = System.currentTimeMillis(); + assertTrue("Invalid session state after " + (waitEnd - waitStart) + " ms: " + res, + res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.WAIT_AUTH))); } finally { client.stop(); } - assertTrue("Session should be closed: " + res, - res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.WAIT_AUTH))); assertSame("Mismatched timeout status reported", TimeoutStatus.AuthTimeout, timeoutHolder.getAndSet(null)); } @@ -301,7 +303,6 @@ public class ServerTest extends BaseTestSupport { sshd.start(); client.start(); - Collection<ClientSession.ClientSessionEvent> res; try (ClientSession s = createTestClientSession(sshd); ChannelShell shell = s.createShellChannel(); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -313,13 +314,16 @@ public class ServerTest extends BaseTestSupport { assertTrue("No changes in activated channels", channelListener.waitForActiveChannelsChange(5L, TimeUnit.SECONDS)); assertTrue("No changes in open channels", channelListener.waitForOpenChannelsChange(5L, TimeUnit.SECONDS)); - res = s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 2L * testIdleTimeout); + long waitStart = System.currentTimeMillis(); + Collection<ClientSession.ClientSessionEvent> res = + s.waitFor(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED), 3L * testIdleTimeout); + long waitEnd = System.currentTimeMillis(); + assertTrue("Invalid session state after " + (waitEnd - waitStart) + " ms: " + res, + res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED))); } finally { client.stop(); } - assertTrue("Session should be closed and authenticated: " + res, - res.containsAll(EnumSet.of(ClientSession.ClientSessionEvent.CLOSED, ClientSession.ClientSessionEvent.AUTHED))); assertTrue("Session latch not signalled in time", latch.await(1L, TimeUnit.SECONDS)); assertTrue("Shell latch not signalled in time", TestEchoShell.latch.await(1L, TimeUnit.SECONDS)); assertSame("Mismatched timeout status", TimeoutStatus.IdleTimeout, timeoutHolder.getAndSet(null)); diff --git a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java index 13dc910..ae15362 100644 --- a/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/server/channel/ChannelSessionTest.java @@ -19,6 +19,7 @@ package org.apache.sshd.server.channel; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -71,26 +72,33 @@ public class ChannelSessionTest extends BaseTestSupport { server.start(); client.start(); - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, server.getPort()).verify(7L, TimeUnit.SECONDS).getSession()) { + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, server.getPort()) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); try (ClientChannel channel = session.createChannel(Channel.CHANNEL_SHELL)) { + channel.open().verify(7L, TimeUnit.SECONDS); - channel.open().await(); - - channel.getInvertedIn().write("echo foo\nexit\n".getBytes()); - channel.getInvertedIn().flush(); + OutputStream invertedIn = channel.getInvertedIn(); + String cmdSent = "echo foo\nexit\n"; + invertedIn.write(cmdSent.getBytes()); + invertedIn.flush(); + long waitStart = System.currentTimeMillis(); Collection<ClientChannelEvent> result = - channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 5000); - assertTrue("Wrong channel state: " + result, result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED))); + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), TimeUnit.SECONDS.toMillis(11L)); + long waitEnd = System.currentTimeMillis(); + assertTrue("Wrong channel state after " + (waitEnd - waitStart) + " ms.: " + result, + result.containsAll(EnumSet.of(ClientChannelEvent.CLOSED))); byte[] b = new byte[1024]; - int l = channel.getInvertedOut().read(b); - String s = l > 0 ? new String(b, 0, l) : ""; + InputStream invertedOut = channel.getInvertedOut(); + int l = invertedOut.read(b); + String cmdReceived = (l > 0) ? new String(b, 0, l) : ""; - assertEquals("echo foo\nexit\n", s); + assertEquals("Mismatched echoed command", cmdSent, cmdReceived); } } } @@ -111,13 +119,14 @@ public class ChannelSessionTest extends BaseTestSupport { } }) { AtomicBoolean expanded = new AtomicBoolean(false); - channelSession.asyncOut = new ChannelAsyncOutputStream(new BogusChannel(), (byte) 0) { - @Override - public void onWindowExpanded() throws IOException { - expanded.set(true); - super.onWindowExpanded(); - } - }; + channelSession.asyncOut = + new ChannelAsyncOutputStream(new BogusChannel(), (byte) 0) { + @Override + public void onWindowExpanded() throws IOException { + expanded.set(true); + super.onWindowExpanded(); + } + }; channelSession.handleWindowAdjust(buffer); assertTrue("Expanded ?", expanded.get()); } diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java index afab75d..8700e7a 100644 --- a/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java +++ b/sshd-core/src/test/java/org/apache/sshd/util/test/BogusChannel.java @@ -50,7 +50,7 @@ public class BogusChannel extends AbstractChannel { @Override public OpenFuture open(int recipient, long rwsize, long rmpsize, Buffer buffer) { - return new DefaultOpenFuture(this, this.lock); + return new DefaultOpenFuture(this, this.futureLock); } @Override diff --git a/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java b/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java index 449a0b0..1b231ff 100644 --- a/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java +++ b/sshd-mina/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java @@ -107,7 +107,7 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession { return new IoBaseCloseable() { @SuppressWarnings("synthetic-access") private final DefaultCloseFuture future = - new DefaultCloseFuture(MinaSession.this.toString(), lock); + new DefaultCloseFuture(MinaSession.this.toString(), futureLock); @SuppressWarnings("synthetic-access") @Override diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java index 96351f6..98d78ed 100644 --- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java @@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.apache.sshd.common.future.CloseFuture; -import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.io.IoAcceptor; import org.apache.sshd.common.io.IoHandler; import org.apache.sshd.common.io.IoServiceEventListener; @@ -59,7 +58,6 @@ import io.netty.util.concurrent.GlobalEventExecutor; */ public class NettyIoAcceptor extends NettyIoService implements IoAcceptor { protected final ServerBootstrap bootstrap = new ServerBootstrap(); - protected final DefaultCloseFuture closeFuture = new DefaultCloseFuture(toString(), lock); protected final Map<SocketAddress, Channel> boundAddresses = new ConcurrentHashMap<>(); public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) { diff --git a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java index 64db5d8..580765d 100644 --- a/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java +++ b/sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java @@ -22,9 +22,9 @@ package org.apache.sshd.netty; import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import org.apache.sshd.common.future.CloseFuture; -import org.apache.sshd.common.future.DefaultCloseFuture; import org.apache.sshd.common.io.AbstractIoWriteFuture; import org.apache.sshd.common.io.IoConnectFuture; import org.apache.sshd.common.io.IoHandler; @@ -54,7 +54,6 @@ public class NettyIoSession extends AbstractCloseable implements IoSession { protected final Map<Object, Object> attributes = new HashMap<>(); protected final NettyIoService service; protected final IoHandler handler; - protected final DefaultCloseFuture closeFuture = new DefaultCloseFuture(toString(), lock); protected final long id; protected ChannelHandlerContext context; protected SocketAddress remoteAddr; @@ -64,6 +63,8 @@ public class NettyIoSession extends AbstractCloseable implements IoSession { private final SocketAddress acceptanceAddress; public NettyIoSession(NettyIoService service, IoHandler handler, SocketAddress acceptanceAddress) { + super(Objects.toString(acceptanceAddress, "")); + this.service = service; this.handler = handler; this.id = service.sessionSeq.incrementAndGet();