This is an automated email from the ASF dual-hosted git repository. twolf pushed a commit to branch dev_3.0 in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit dfad9c08beee1fa01405017e901cc6f91d999614 Author: Thomas Wolf <tw...@apache.org> AuthorDate: Wed Apr 9 18:46:06 2025 +0200 Simplify checking for KEX when writing a message Better call startKexIsNeeded() explicitly than having a boolean parameter. --- .../sshd/common/session/filters/kex/KexFilter.java | 9 +-- .../session/filters/kex/KexOutputHandler.java | 64 +++++++++++----------- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java index d72fe506b..a12be542d 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java @@ -1085,12 +1085,9 @@ public class KexFilter extends IoFilter { } // Entry points for the KexOutputHandler - IoWriteFuture write(int cmd, Buffer buffer, boolean checkForKex) throws IOException { - IoWriteFuture result = forward.send(cmd, buffer); - if (checkForKex) { - startKexIfNeeded(); - } - return result; + + IoWriteFuture write(int cmd, Buffer buffer) throws IOException { + return forward.send(cmd, buffer); } void startKexIfNeeded() throws IOException { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexOutputHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexOutputHandler.java index d59ee3250..e9dd3f90b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexOutputHandler.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexOutputHandler.java @@ -219,7 +219,8 @@ class KexOutputHandler implements OutputHandler { try { if (isLowLevelMessage) { // Low-level messages can always be sent. - future = filter.write(cmd, buffer, true); + future = filter.write(cmd, buffer); + filter.startKexIfNeeded(); } else { future = writeOrEnqueue(cmd, buffer); if (!(future instanceof PendingWriteFuture)) { @@ -249,38 +250,36 @@ class KexOutputHandler implements OutputHandler { * @throws IOException if an error occurs */ private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer) throws IOException { - for (;;) { - // We must decide _and_ write the packet while holding the lock. If we'd write the packet outside this - // lock, there is no guarantee that a concurrently running KEX_INIT received from the peer doesn't change - // the state to RUN and grabs the encodeLock before the thread executing this write operation. If this - // happened, we might send a high-level messages after our KEX_INIT, which is not allowed by RFC 4253. - // - // Use the readLock here to give KEX state updates and the flushing thread priority. - lock.readLock().lock(); - try { - if (shutDown.get()) { - throw new SshException("Write attempt on closing session: " + SshConstants.getCommandMessageName(cmd)); - } - KexState state = filter.getKexState().get(); - boolean kexDone = KexState.DONE.equals(state) || KexState.KEYS.equals(state); - if (kexDone && kexFlushed.get()) { - // Not in KEX, no pending packets: out it goes. - return filter.write(cmd, buffer, false); - } else { - // Still in KEX or still flushing. Enqueue the packet; it will get written by the flushing thread at - // the end of KEX. See the javadoc of KexFilter. - // - // If so many packets are queued that flushing them triggers another KEX flushing stops - // and will be resumed at the end of the new KEX. - if (kexDone && log.isDebugEnabled()) { - log.debug("writeOrEnqueue({})[{}]: Queuing packet while flushing", filter.getSession(), - SshConstants.getCommandMessageName(cmd)); - } - return enqueuePendingPacket(cmd, buffer); + // We must decide _and_ write the packet while holding the lock. If we'd write the packet outside this + // lock, there is no guarantee that a concurrently running KEX_INIT received from the peer doesn't change + // the state to RUN and grabs the encodeLock before the thread executing this write operation. If this + // happened, we might send a high-level messages after our KEX_INIT, which is not allowed by RFC 4253. + // + // Use the readLock here to give KEX state updates and the flushing thread priority. + lock.readLock().lock(); + try { + if (shutDown.get()) { + throw new SshException("Write attempt on closing session: " + SshConstants.getCommandMessageName(cmd)); + } + KexState state = filter.getKexState().get(); + boolean kexDone = KexState.DONE.equals(state) || KexState.KEYS.equals(state); + if (kexDone && kexFlushed.get()) { + // Not in KEX, no pending packets: out it goes. + return filter.write(cmd, buffer); + } else { + // Still in KEX or still flushing. Enqueue the packet; it will get written by the flushing thread at + // the end of KEX. See the javadoc of KexFilter. + // + // If so many packets are queued that flushing them triggers another KEX flushing stops + // and will be resumed at the end of the new KEX. + if (kexDone && log.isDebugEnabled()) { + log.debug("writeOrEnqueue({})[{}]: Queuing packet while flushing", filter.getSession(), + SshConstants.getCommandMessageName(cmd)); } - } finally { - lock.readLock().unlock(); + return enqueuePendingPacket(cmd, buffer); } + } finally { + lock.readLock().unlock(); } } @@ -401,7 +400,8 @@ class KexOutputHandler implements OutputHandler { } Buffer buf = pending.getBuffer(); int cmd = buf.rawByte(buf.rpos()) & 0xFF; - written = filter.write(cmd, buf, true); + written = filter.write(cmd, buf); + filter.startKexIfNeeded(); } catch (Throwable e) { log.error("flushQueue({}): Exception while flushing packet at end of KEX for {}", filter.getSession(),