This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit 962835303e015b09be51ffcce9392aae5103aef8
Author: Lyor Goldstein <lgoldst...@apache.org>
AuthorDate: Thu Feb 27 10:43:54 2020 +0200

    [SSHD-968] Interpret SSH_MSG_UNIMPLEMENTED response to a heartbeat request 
as a liveness indicator
---
 CHANGES.md                                         |  7 ++
 docs/client-setup.md                               |  5 +-
 .../org/apache/sshd/common/util/buffer/Buffer.java | 16 +++-
 .../sshd/common/util/buffer/ByteArrayBuffer.java   |  5 ++
 .../common/session/helpers/AbstractSession.java    | 97 ++++++++++++++++++++--
 .../sshd/common/session/helpers/SessionHelper.java |  2 +-
 .../test/java/org/apache/sshd/KeepAliveTest.java   | 68 +++++++++++++--
 7 files changed, 180 insertions(+), 20 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 4bdb781..f605b5e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,8 +8,15 @@
 
 ## Major code re-factoring
 
+* Reception of an `SSH_MSG_UNIMPLEMENTED` response to a 
`SSH_MSG_GLOBAL_REQUEST` is
+translated internally into same code flow as if an `SSH_MSH_REQUEST_FAILURE` 
has
+been received - see [SSHD-968](https://issues.apache.org/jira/browse/SSHD-968).
+
 ## Minor code helpers
 
+* Handling of debug/ignore/unimplemented messages has been split into 
`handleXXX` and `doInvokeXXXMsgHandler` methods
+where the former validate the messages and deal with the idle timeout, and the 
latter execute the actual invcation.
+
 ## Behavioral changes and enhancements
 
 * [SSHD-964](https://issues.apache.org/jira/browse/SSHD-964) - Send 
SSH_MSG_CHANNEL_EOF when tunnel channel being closed.
diff --git a/docs/client-setup.md b/docs/client-setup.md
index 6e275df..f2d2b5c 100644
--- a/docs/client-setup.md
+++ b/docs/client-setup.md
@@ -238,7 +238,10 @@ the `ClientSession` (for specific session configuration).
     * If specified timeout expires for the `wantReply` option then session 
will be **closed**.
 
     * *Any* response - including 
[`SSH_MSH_REQUEST_FAILURE`](https://tools.ietf.org/html/rfc4254#page-4)
-    is considered a "good" response for the heartbeat request.
+    is considered a "good" response for the heartbeat request. In this 
context, a special patch
+    has been introduced in 
[SSHD-968](https://issues.apache.org/jira/browse/SSHD-968) that converts
+    an `SSH_MSG_UNIMPLEMENTED` response to such a global request into a 
`SSH_MSH_REQUEST_FAILURE`
+    since some servers have been found that violate the standard and reply 
with it to the request.
 
 * When using the CLI, these options can be configured using the following `-o 
key=value` properties:
 
diff --git 
a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java 
b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java
index bb29204..2482fb0 100644
--- a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java
+++ b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/Buffer.java
@@ -115,7 +115,9 @@ public abstract class Buffer implements Readable {
 
     /**
      * @param pos A position in the <U>raw</U> underlying data bytes
-     * @return The byte at that position
+     * @return The byte at the specified position without changing the
+     * current {@link #rpos() read position}. <B>Note:</B> no validation
+     * is made whether the position lies within array boundaries
      */
     public byte rawByte(int pos) {
         byte[] data = array();
@@ -123,6 +125,18 @@ public abstract class Buffer implements Readable {
     }
 
     /**
+     * @param pos A position in the <U>raw</U> underlying data bytes
+     * @return The unsigned 32 bit integer at the specified position
+     * without changing the current {@link #rpos() read position}.
+     * <B>Note:</B> no validation is made whether the position and
+     * the required extra 4 bytes lie within array boundaries
+     */
+    public long rawUInt(int pos) {
+        byte[] data = array();
+        return BufferUtils.getUInt(data, pos, Integer.BYTES);
+    }
+
+    /**
      * &quot;Shift&quot; the internal data so that reading starts from
      * position zero.
      */
diff --git 
a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java
 
b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java
index e0dee96..65fcb0b 100644
--- 
a/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java
+++ 
b/sshd-common/src/main/java/org/apache/sshd/common/util/buffer/ByteArrayBuffer.java
@@ -163,6 +163,11 @@ public class ByteArrayBuffer extends Buffer {
     }
 
     @Override
+    public long rawUInt(int pos) {
+        return BufferUtils.getUInt(data, pos, Integer.BYTES);
+    }
+
+    @Override
     public void compact() {
         int avail = available();
         if (avail > 0) {
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index ab46578..5c43b8b 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -193,6 +193,9 @@ public abstract class AbstractSession extends SessionHelper 
{
     protected final Queue<PendingWriteFuture> pendingPackets = new 
LinkedList<>();
 
     protected Service currentService;
+    // SSHD-968 - outgoing sequence number and request name of last sent 
global request
+    protected final AtomicLong globalRequestSeqo = new AtomicLong(-1L);
+    protected final AtomicReference<String> pendingGlobalRequest = new 
AtomicReference<>();
 
     // SSH_MSG_IGNORE stream padding
     protected int ignorePacketDataLength = 
FactoryManager.DEFAULT_IGNORE_MESSAGE_SIZE;
@@ -797,10 +800,7 @@ public abstract class AbstractSession extends 
SessionHelper {
         }
 
         // if anyone waiting for global response notify them about the closing 
session
-        synchronized (requestResult) {
-            requestResult.set(GenericUtils.NULL);
-            requestResult.notifyAll();
-        }
+        signalRequestFailure();
 
         // Fire 'close' event
         try {
@@ -837,6 +837,21 @@ public abstract class AbstractSession extends 
SessionHelper {
     }
 
     @Override
+    protected Buffer preProcessEncodeBuffer(int cmd, Buffer buffer) throws 
IOException {
+        buffer = super.preProcessEncodeBuffer(cmd, buffer);
+        // SSHD-968 - remember global request outgoing sequence number
+        if (cmd == SshConstants.SSH_MSG_GLOBAL_REQUEST) {
+            long prev = globalRequestSeqo.getAndSet(seqo);
+            if (log.isDebugEnabled()) {
+                log.debug("preProcessEncodeBuffer({}) outgoing 
SSH_MSG_GLOBAL_REQUEST seqNo={} => {}",
+                    this, prev, globalRequestSeqo);
+            }
+        }
+
+        return buffer;
+    }
+
+    @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         // While exchanging key, queue high level packets
         if (!KexState.DONE.equals(kexState.get())) {
@@ -957,11 +972,18 @@ public abstract class AbstractSession extends 
SessionHelper {
 
         Object result;
         boolean traceEnabled = log.isTraceEnabled();
+        long prevGlobalReqSeqNo = -1L;
         synchronized (requestLock) {
             try {
                 writePacket(buffer);
 
+                if (traceEnabled) {
+                    log.debug("request({})[{}] sent with seqNo={}", this, 
request, globalRequestSeqo);
+                }
+
                 synchronized (requestResult) {
+                    pendingGlobalRequest.set(request);
+
                     while (isOpen() && (maxWaitMillis > 0L) && 
(requestResult.get() == null)) {
                         if (traceEnabled) {
                             log.trace("request({})[{}] remaining wait={}", 
this, request, maxWaitMillis);
@@ -980,6 +1002,9 @@ public abstract class AbstractSession extends 
SessionHelper {
                     }
 
                     result = requestResult.getAndSet(null);
+                    // SSHD-968 reset tracked request name and sequence number
+                    prevGlobalReqSeqNo = globalRequestSeqo.getAndSet(-1L);
+                    pendingGlobalRequest.set(null);
                 }
             } catch (InterruptedException e) {
                 throw (InterruptedIOException) new InterruptedIOException(
@@ -988,16 +1013,18 @@ public abstract class AbstractSession extends 
SessionHelper {
         }
 
         if (!isOpen()) {
-            throw new IOException("Session is closed or closing while awaiting 
reply for request=" + request);
+            throw new IOException(
+                "Session is closed or closing while awaiting reply for 
request=" + request);
         }
 
         if (debugEnabled) {
-            log.debug("request({}) request={}, timeout={} {}, result 
received={}",
-                  this, request, timeout, unit, result != null);
+            log.debug("request({}) request={}, timeout={} {}, requestSeqNo={}, 
result received={}",
+                this, request, timeout, unit, prevGlobalReqSeqNo, result != 
null);
         }
 
         if (result == null) {
-            throw new SocketTimeoutException("No response received after " + 
timeout + " " + unit + " for request=" + request);
+            throw new SocketTimeoutException(
+                "No response received after " + timeout + " " + unit + " for 
request=" + request);
         }
 
         if (result instanceof Buffer) {
@@ -1008,6 +1035,50 @@ public abstract class AbstractSession extends 
SessionHelper {
     }
 
     @Override
+    protected boolean doInvokeUnimplementedMessageHandler(int cmd, Buffer 
buffer) throws Exception {
+        /*
+         * SSHD-968 Some servers respond to global requests with 
SSH_MSG_UNIMPLEMENTED
+         * instead of SSH_MSG_REQUEST_FAILURE (as mandated by 
https://tools.ietf.org/html/rfc4254#section-4)
+         * so deal with it
+         */
+        long reqSeqNo = -1L;
+        long msgSeqNo = -1L;
+        String reqGlobal = null;
+        boolean propagateCall = true;
+        if ((cmd == SshConstants.SSH_MSG_UNIMPLEMENTED)
+                && (globalRequestSeqo.get() >= 0L)) {
+            int rpos = buffer.rpos();
+            msgSeqNo = buffer.rawUInt(rpos);
+
+            synchronized (requestResult) {
+                // must re-fetch value under correct lock
+                reqSeqNo = globalRequestSeqo.get();
+                if (reqSeqNo == msgSeqNo) {
+                    reqGlobal = pendingGlobalRequest.get();
+                    propagateCall = false;
+                    signalRequestFailure();
+                }
+            }
+        }
+
+        if (propagateCall) {
+            if (log.isTraceEnabled()) {
+                log.trace("doInvokeUnimplementedMessageHandler({}) 
reqSeqNo={}, msgSeqNo={}, reqGlobal={}",
+                    this, reqSeqNo, msgSeqNo, reqGlobal);
+            }
+
+            return super.doInvokeUnimplementedMessageHandler(cmd, buffer);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("doInvokeUnimplementedMessageHandler({}) report global 
request={} failure for seqNo={}",
+                this, reqGlobal, reqSeqNo);
+        }
+
+        return true;    // message handled internally
+    }
+
+    @Override
     public Buffer createBuffer(byte cmd, int len) {
         if (len <= 0) {
             return prepareBuffer(cmd, new ByteArrayBuffer());
@@ -1779,7 +1850,8 @@ public abstract class AbstractSession extends 
SessionHelper {
      */
     protected void requestSuccess(Buffer buffer) throws Exception {
         // use a copy of the original data in case it is re-used on return
-        Buffer resultBuf = ByteArrayBuffer.getCompactClone(buffer.array(), 
buffer.rpos(), buffer.available());
+        Buffer resultBuf = ByteArrayBuffer.getCompactClone(
+            buffer.array(), buffer.rpos(), buffer.available());
         synchronized (requestResult) {
             requestResult.set(resultBuf);
             resetIdleTimeout();
@@ -1794,6 +1866,13 @@ public abstract class AbstractSession extends 
SessionHelper {
      * @throws Exception If failed to handle the message
      */
     protected void requestFailure(Buffer buffer) throws Exception {
+        signalRequestFailure();
+    }
+
+    /**
+     * Marks the current pending global request result as failed
+     */
+    protected void signalRequestFailure() {
         synchronized (requestResult) {
             requestResult.set(GenericUtils.NULL);
             resetIdleTimeout();
diff --git 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
index 3676a44..7e021ba 100644
--- 
a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
+++ 
b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/SessionHelper.java
@@ -556,10 +556,10 @@ public abstract class SessionHelper extends 
AbstractKexFactoryManager implements
         return writePacket(buffer);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit 
unit) throws IOException {
         IoWriteFuture writeFuture = writePacket(buffer);
+        @SuppressWarnings("unchecked")
         DefaultSshFuture<IoWriteFuture> future = 
(DefaultSshFuture<IoWriteFuture>) writeFuture;
         FactoryManager factoryManager = getFactoryManager();
         ScheduledExecutorService executor = 
factoryManager.getScheduledExecutorService();
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java 
b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
index efba1ae..2ddf72a 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
@@ -20,7 +20,9 @@ package org.apache.sshd;
 
 import java.io.ByteArrayOutputStream;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -32,6 +34,10 @@ import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.RequestHandler;
+import org.apache.sshd.common.session.ConnectionService;
+import 
org.apache.sshd.common.session.helpers.AbstractConnectionServiceRequestHandler;
+import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.channel.ChannelSession;
 import org.apache.sshd.server.command.Command;
@@ -101,17 +107,21 @@ public class KeepAliveTest extends BaseTestSupport {
 
     @After
     public void tearDown() {
+        // Restore default value
         PropertyResolverUtils.updateProperty(
             sshd, FactoryManager.IDLE_TIMEOUT, 
FactoryManager.DEFAULT_IDLE_TIMEOUT);
         PropertyResolverUtils.updateProperty(
             client, ClientFactoryManager.HEARTBEAT_INTERVAL, 
ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
+        PropertyResolverUtils.updateProperty(
+            client, ClientFactoryManager.HEARTBEAT_REPLY_WAIT, 
ClientFactoryManager.DEFAULT_HEARTBEAT_REPLY_WAIT);
     }
 
     @Test
     public void testIdleClient() throws Exception {
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
-                .verify(7L, TimeUnit.SECONDS)
-                .getSession()) {
+        try (ClientSession session =
+                client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
+                    .verify(7L, TimeUnit.SECONDS)
+                    .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
 
@@ -128,10 +138,12 @@ public class KeepAliveTest extends BaseTestSupport {
 
     @Test
     public void testClientWithHeartBeat() throws Exception {
-        PropertyResolverUtils.updateProperty(client, 
ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT);
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
-                .verify(7L, TimeUnit.SECONDS)
-                .getSession()) {
+        PropertyResolverUtils.updateProperty(
+            client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT);
+        try (ClientSession session =
+                client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
+                    .verify(7L, TimeUnit.SECONDS)
+                    .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
             session.auth().verify(5L, TimeUnit.SECONDS);
 
@@ -150,7 +162,8 @@ public class KeepAliveTest extends BaseTestSupport {
     public void testShellClosedOnClientTimeout() throws Exception {
         TestEchoShell.latch = new CountDownLatch(1);
 
-        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+        try (ClientSession session =
+                client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
                     .verify(7L, TimeUnit.SECONDS)
                     .getSession()) {
             session.addPasswordIdentity(getCurrentTestName());
@@ -178,6 +191,45 @@ public class KeepAliveTest extends BaseTestSupport {
         }
     }
 
+    @Test // see SSHD-968
+    public void testAllowUnimplementedMessageHeartbeatResponse() throws 
Exception {
+        List<RequestHandler<ConnectionService>> globalHandlers = 
sshd.getGlobalRequestHandlers();
+        sshd.setGlobalRequestHandlers(
+            Collections.singletonList(
+                new AbstractConnectionServiceRequestHandler() {
+                    @Override
+                    public Result process(
+                            ConnectionService connectionService, String 
request,
+                            boolean wantReply, Buffer buffer)
+                                throws Exception {
+                        connectionService.process(
+                            255 /* trigger unimplemented command handler */, 
buffer);
+                        return Result.Replied;
+                    }
+                }));
+        PropertyResolverUtils.updateProperty(
+            client, ClientFactoryManager.HEARTBEAT_INTERVAL, HEARTBEAT);
+        PropertyResolverUtils.updateProperty(
+            client, ClientFactoryManager.HEARTBEAT_REPLY_WAIT, 
TimeUnit.SECONDS.toMillis(5L));
+        try (ClientSession session = client.connect(getCurrentTestName(), 
TEST_LOCALHOST, port)
+                .verify(7L, TimeUnit.SECONDS)
+                .getSession()) {
+            session.addPasswordIdentity(getCurrentTestName());
+            session.auth().verify(5L, TimeUnit.SECONDS);
+
+            try (ClientChannel channel = 
session.createChannel(Channel.CHANNEL_SHELL)) {
+                long waitStart = System.currentTimeMillis();
+                Collection<ClientChannelEvent> result =
+                    channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 
WAIT);
+                long waitEnd = System.currentTimeMillis();
+                assertTrue("Wrong channel state after wait of " + (waitEnd - 
waitStart) + " ms: " + result,
+                    result.contains(ClientChannelEvent.TIMEOUT));
+            }
+        } finally {
+            sshd.setGlobalRequestHandlers(globalHandlers); // restore original
+        }
+    }
+
     public static class TestEchoShellFactory extends EchoShellFactory {
         public TestEchoShellFactory() {
             super();

Reply via email to