This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 962835303e015b09be51ffcce9392aae5103aef8 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Thu Feb 27 10:43:54 2020 +0200 [SSHD-968] Interpret SSH_MSG_UNIMPLEMENTED response to a heartbeat request as a liveness indicator --- CHANGES.md | 7 ++ docs/client-setup.md | 5 +- .../org/apache/sshd/common/util/buffer/Buffer.java | 16 +++- .../sshd/common/util/buffer/ByteArrayBuffer.java | 5 ++ .../common/session/helpers/AbstractSession.java | 97 ++++++++++++++++++++-- .../sshd/common/session/helpers/SessionHelper.java | 2 +- .../test/java/org/apache/sshd/KeepAliveTest.java | 68 +++++++++++++-- 7 files changed, 180 insertions(+), 20 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4bdb781..f605b5e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,8 +8,15 @@ ## Major code re-factoring +* Reception of an `SSH_MSG_UNIMPLEMENTED` response to a `SSH_MSG_GLOBAL_REQUEST` is +translated internally into same code flow as if an `SSH_MSH_REQUEST_FAILURE` has +been received - see [SSHD-968](https://issues.apache.org/jira/browse/SSHD-968). + ## Minor code helpers +* Handling of debug/ignore/unimplemented messages has been split into `handleXXX` and `doInvokeXXXMsgHandler` methods +where the former validate the messages and deal with the idle timeout, and the latter execute the actual invcation. + ## Behavioral changes and enhancements * [SSHD-964](https://issues.apache.org/jira/browse/SSHD-964) - Send SSH_MSG_CHANNEL_EOF when tunnel channel being closed. diff --git a/docs/client-setup.md b/docs/client-setup.md index 6e275df..f2d2b5c 100644 --- a/docs/client-setup.md +++ b/docs/client-setup.md @@ -238,7 +238,10 @@ the `ClientSession` (for specific session configuration). * If specified timeout expires for the `wantReply` option then session will be **closed**. * *Any* response - including [`SSH_MSH_REQUEST_FAILURE`](https://tools.ietf.org/html/rfc4254#page-4) - is considered a "good" response for the heartbeat request. + is considered a "good" response for the heartbeat request. In this context, a special patch + has been introduced in [SSHD-968](https://issues.apache.org/jira/browse/SSHD-968) that converts + an `SSH_MSG_UNIMPLEMENTED` response to such a global request into a `SSH_MSH_REQUEST_FAILURE` + since some servers have been found that violate the standard and reply with it to the request. * When using the CLI, these options can be configured using the following `-o key=value` properties: diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java index bb29204..2482fb0 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java @@ -115,7 +115,9 @@ public abstract class Buffer implements Readable { /** * @param pos A position in the <U>raw</U> underlying data bytes - * @return The byte at that position + * @return The byte at the specified position without changing the + * current {@link #rpos() read position}. <B>Note:</B> no validation + * is made whether the position lies within array boundaries */ public byte rawByte(int pos) { byte[] data = array(); @@ -123,6 +125,18 @@ public abstract class Buffer implements Readable { } /** + * @param pos A position in the <U>raw</U> underlying data bytes + * @return The unsigned 32 bit integer at the specified position + * without changing the current {@link #rpos() read position}. + * <B>Note:</B> no validation is made whether the position and + * the required extra 4 bytes lie within array boundaries + */ + public long rawUInt(int pos) { + byte[] data = array(); + return BufferUtils.getUInt(data, pos, Integer.BYTES); + } + + /** * "Shift" the internal data so that reading starts from * position zero. */ diff --git a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java index e0dee96..65fcb0b 100644 --- a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java +++ b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java @@ -163,6 +163,11 @@ public class ByteArrayBuffer extends Buffer { } @Override + public long rawUInt(int pos) { + return BufferUtils.getUInt(data, pos, Integer.BYTES); + } + + @Override public void compact() { int avail = available(); if (avail > 0) { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index ab46578..5c43b8b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -193,6 +193,9 @@ public abstract class AbstractSession extends SessionHelper { protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<>(); protected Service currentService; + // SSHD-968 - outgoing sequence number and request name of last sent global request + protected final AtomicLong globalRequestSeqo = new AtomicLong(-1L); + protected final AtomicReference<String> pendingGlobalRequest = new AtomicReference<>(); // SSH_MSG_IGNORE stream padding protected int ignorePacketDataLength = FactoryManager.DEFAULT_IGNORE_MESSAGE_SIZE; @@ -797,10 +800,7 @@ public abstract class AbstractSession extends SessionHelper { } // if anyone waiting for global response notify them about the closing session - synchronized (requestResult) { - requestResult.set(GenericUtils.NULL); - requestResult.notifyAll(); - } + signalRequestFailure(); // Fire 'close' event try { @@ -837,6 +837,21 @@ public abstract class AbstractSession extends SessionHelper { } @Override + protected Buffer preProcessEncodeBuffer(int cmd, Buffer buffer) throws IOException { + buffer = super.preProcessEncodeBuffer(cmd, buffer); + // SSHD-968 - remember global request outgoing sequence number + if (cmd == SshConstants.SSH_MSG_GLOBAL_REQUEST) { + long prev = globalRequestSeqo.getAndSet(seqo); + if (log.isDebugEnabled()) { + log.debug("preProcessEncodeBuffer({}) outgoing SSH_MSG_GLOBAL_REQUEST seqNo={} => {}", + this, prev, globalRequestSeqo); + } + } + + return buffer; + } + + @Override public IoWriteFuture writePacket(Buffer buffer) throws IOException { // While exchanging key, queue high level packets if (!KexState.DONE.equals(kexState.get())) { @@ -957,11 +972,18 @@ public abstract class AbstractSession extends SessionHelper { Object result; boolean traceEnabled = log.isTraceEnabled(); + long prevGlobalReqSeqNo = -1L; synchronized (requestLock) { try { writePacket(buffer); + if (traceEnabled) { + log.debug("request({})[{}] sent with seqNo={}", this, request, globalRequestSeqo); + } + synchronized (requestResult) { + pendingGlobalRequest.set(request); + while (isOpen() && (maxWaitMillis > 0L) && (requestResult.get() == null)) { if (traceEnabled) { log.trace("request({})[{}] remaining wait={}", this, request, maxWaitMillis); @@ -980,6 +1002,9 @@ public abstract class AbstractSession extends SessionHelper { } result = requestResult.getAndSet(null); + // SSHD-968 reset tracked request name and sequence number + prevGlobalReqSeqNo = globalRequestSeqo.getAndSet(-1L); + pendingGlobalRequest.set(null); } } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException( @@ -988,16 +1013,18 @@ public abstract class AbstractSession extends SessionHelper { } if (!isOpen()) { - throw new IOException("Session is closed or closing while awaiting reply for request=" + request); + throw new IOException( + "Session is closed or closing while awaiting reply for request=" + request); } if (debugEnabled) { - log.debug("request({}) request={}, timeout={} {}, result received={}", - this, request, timeout, unit, result != null); + log.debug("request({}) request={}, timeout={} {}, requestSeqNo={}, result received={}", + this, request, timeout, unit, prevGlobalReqSeqNo, result != null); } if (result == null) { - throw new SocketTimeoutException("No response received after " + timeout + " " + unit + " for request=" + request); + throw new SocketTimeoutException( + "No response received after " + timeout + " " + unit + " for request=" + request); } if (result instanceof Buffer) { @@ -1008,6 +1035,50 @@ public abstract class AbstractSession extends SessionHelper { } @Override + protected boolean doInvokeUnimplementedMessageHandler(int cmd, Buffer buffer) throws Exception { + /* + * SSHD-968 Some servers respond to global requests with SSH_MSG_UNIMPLEMENTED + * instead of SSH_MSG_REQUEST_FAILURE (as mandated by https://tools.ietf.org/html/rfc4254#section-4) + * so deal with it + */ + long reqSeqNo = -1L; + long msgSeqNo = -1L; + String reqGlobal = null; + boolean propagateCall = true; + if ((cmd == SshConstants.SSH_MSG_UNIMPLEMENTED) + && (globalRequestSeqo.get() >= 0L)) { + int rpos = buffer.rpos(); + msgSeqNo = buffer.rawUInt(rpos); + + synchronized (requestResult) { + // must re-fetch value under correct lock + reqSeqNo = globalRequestSeqo.get(); + if (reqSeqNo == msgSeqNo) { + reqGlobal = pendingGlobalRequest.get(); + propagateCall = false; + signalRequestFailure(); + } + } + } + + if (propagateCall) { + if (log.isTraceEnabled()) { + log.trace("doInvokeUnimplementedMessageHandler({}) reqSeqNo={}, msgSeqNo={}, reqGlobal={}", + this, reqSeqNo, msgSeqNo, reqGlobal); + } + + return super.doInvokeUnimplementedMessageHandler(cmd, buffer); + } + + if (log.isDebugEnabled()) { + log.debug("doInvokeUnimplementedMessageHandler({}) report global request={} failure for seqNo={}", + this, reqGlobal, reqSeqNo); + } + + return true; // message handled internally + } + + @Override public Buffer createBuffer(byte cmd, int len) { if (len <= 0) { return prepareBuffer(cmd, new ByteArrayBuffer()); @@ -1779,7 +1850,8 @@ public abstract class AbstractSession extends SessionHelper { */ protected void requestSuccess(Buffer buffer) throws Exception { // use a copy of the original data in case it is re-used on return - Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), buffer.rpos(), buffer.available()); + Buffer resultBuf = ByteArrayBuffer.getCompactClone( + buffer.array(), buffer.rpos(), buffer.available()); synchronized (requestResult) { requestResult.set(resultBuf); resetIdleTimeout(); @@ -1794,6 +1866,13 @@ public abstract class AbstractSession extends SessionHelper { * @throws Exception If failed to handle the message */ protected void requestFailure(Buffer buffer) throws Exception { + signalRequestFailure(); + } + + /** + * Marks the current pending global request result as failed + */ + protected void signalRequestFailure() { synchronized (requestResult) { requestResult.set(GenericUtils.NULL); resetIdleTimeout(); diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java index 3676a44..7e021ba 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java @@ -556,10 +556,10 @@ public abstract class SessionHelper extends AbstractKexFactoryManager implements return writePacket(buffer); } - @SuppressWarnings("unchecked") @Override public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit unit) throws IOException { IoWriteFuture writeFuture = writePacket(buffer); + @SuppressWarnings("unchecked") DefaultSshFuture<IoWriteFuture> future = (DefaultSshFuture<IoWriteFuture>) writeFuture; FactoryManager factoryManager = getFactoryManager(); ScheduledExecutorService executor = factoryManager.getScheduledExecutorService(); diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java index efba1ae..2ddf72a 100644 --- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java @@ -20,7 +20,9 @@ package org.apache.sshd; import java.io.ByteArrayOutputStream; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,6 +34,10 @@ import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.PropertyResolverUtils; import org.apache.sshd.common.channel.Channel; +import org.apache.sshd.common.channel.RequestHandler; +import org.apache.sshd.common.session.ConnectionService; +import org.apache.sshd.common.session.helpers.AbstractConnectionServiceRequestHandler; +import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.channel.ChannelSession; import org.apache.sshd.server.command.Command; @@ -101,17 +107,21 @@ public class KeepAliveTest extends BaseTestSupport { @After public void tearDown() { + // Restore default value PropertyResolverUtils.updateProperty( sshd, FactoryManager.IDLE_TIMEOUT, FactoryManager.DEFAULT_IDLE_TIMEOUT); PropertyResolverUtils.updateProperty( client, ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL); + PropertyResolverUtils.updateProperty( + client, ClientFactoryManager.HEARTBEAT_REPLY_WAIT, ClientFactoryManager.DEFAULT_HEARTBEAT_REPLY_WAIT); } @Test public void testIdleClient() throws Exception { - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) - .verify(7L, TimeUnit.SECONDS) - .getSession()) { + try (ClientSession session = + client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); @@ -128,10 +138,12 @@ public class KeepAliveTest extends BaseTestSupport { @Test public void testClientWithHeartBeat() throws Exception { - PropertyResolverUtils.updateProperty(client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT); - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) - .verify(7L, TimeUnit.SECONDS) - .getSession()) { + PropertyResolverUtils.updateProperty( + client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT); + try (ClientSession session = + client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { session.addPasswordIdentity(getCurrentTestName()); session.auth().verify(5L, TimeUnit.SECONDS); @@ -150,7 +162,8 @@ public class KeepAliveTest extends BaseTestSupport { public void testShellClosedOnClientTimeout() throws Exception { TestEchoShell.latch = new CountDownLatch(1); - try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + try (ClientSession session = + client.connect(getCurrentTestName(), TEST_LOCALHOST, port) .verify(7L, TimeUnit.SECONDS) .getSession()) { session.addPasswordIdentity(getCurrentTestName()); @@ -178,6 +191,45 @@ public class KeepAliveTest extends BaseTestSupport { } } + @Test // see SSHD-968 + public void testAllowUnimplementedMessageHeartbeatResponse() throws Exception { + List<RequestHandler<ConnectionService>> globalHandlers = sshd.getGlobalRequestHandlers(); + sshd.setGlobalRequestHandlers( + Collections.singletonList( + new AbstractConnectionServiceRequestHandler() { + @Override + public Result process( + ConnectionService connectionService, String request, + boolean wantReply, Buffer buffer) + throws Exception { + connectionService.process( + 255 /* trigger unimplemented command handler */, buffer); + return Result.Replied; + } + })); + PropertyResolverUtils.updateProperty( + client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT); + PropertyResolverUtils.updateProperty( + client, ClientFactoryManager.HEARTBEAT_REPLY_WAIT, TimeUnit.SECONDS.toMillis(5L)); + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port) + .verify(7L, TimeUnit.SECONDS) + .getSession()) { + session.addPasswordIdentity(getCurrentTestName()); + session.auth().verify(5L, TimeUnit.SECONDS); + + try (ClientChannel channel = session.createChannel(Channel.CHANNEL_SHELL)) { + long waitStart = System.currentTimeMillis(); + Collection<ClientChannelEvent> result = + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), WAIT); + long waitEnd = System.currentTimeMillis(); + assertTrue("Wrong channel state after wait of " + (waitEnd - waitStart) + " ms: " + result, + result.contains(ClientChannelEvent.TIMEOUT)); + } + } finally { + sshd.setGlobalRequestHandlers(globalHandlers); // restore original + } + } + public static class TestEchoShellFactory extends EchoShellFactory { public TestEchoShellFactory() { super();