This is an automated email from the ASF dual-hosted git repository. lgoldstein pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push: new fd0599c [SSHD-966] Separated pending packets enqueue to separated method in AbstractSession fd0599c is described below commit fd0599c387fbfb1d538cc7fe6b7334ef274f5803 Author: Lyor Goldstein <lgoldst...@apache.org> AuthorDate: Sun Mar 8 20:44:02 2020 +0200 [SSHD-966] Separated pending packets enqueue to separated method in AbstractSession --- .../common/session/helpers/AbstractSession.java | 69 ++++++++++++++++------ 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 5c95ab3..6718034 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -755,9 +755,9 @@ public abstract class AbstractSession extends SessionHelper { int numPending = packetsQueue.size(); List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingWrites = new ArrayList<>(numPending); synchronized (encodeLock) { - for (PendingWriteFuture future = pendingPackets.poll(); + for (PendingWriteFuture future = packetsQueue.poll(); future != null; - future = pendingPackets.poll()) { + future = packetsQueue.poll()) { IoWriteFuture writeFuture = doWritePacket(future.getBuffer()); pendingWrites.add(new SimpleImmutableEntry<>(future, writeFuture)); } @@ -853,23 +853,9 @@ public abstract class AbstractSession extends SessionHelper { @Override public IoWriteFuture writePacket(Buffer buffer) throws IOException { // While exchanging key, queue high level packets - if (!KexState.DONE.equals(kexState.get())) { - byte[] bufData = buffer.array(); - int cmd = bufData[buffer.rpos()] & 0xFF; - if (cmd > SshConstants.SSH_MSG_KEX_LAST) { - String cmdName = SshConstants.getCommandMessageName(cmd); - boolean debugEnabled = log.isDebugEnabled(); - synchronized (pendingPackets) { - if (!KexState.DONE.equals(kexState.get())) { - if (pendingPackets.isEmpty() && debugEnabled) { - log.debug("writePacket({})[{}] Start flagging packets as pending until key exchange is done", this, cmdName); - } - PendingWriteFuture future = new PendingWriteFuture(cmdName, buffer); - pendingPackets.add(future); - return future; - } - } - } + PendingWriteFuture future = enqueuePendingPacket(buffer); + if (future != null) { + return future; } try { @@ -889,6 +875,51 @@ public abstract class AbstractSession extends SessionHelper { } } + /** + * Checks if key-exchange is done - if so, or the packet is related to the + * key-exchange protocol, then allows the packet to go through, otherwise + * enqueues it to be sent when key-exchange completed + * + * @param buffer The {@link Buffer} containing the packet to be sent + * @return A {@link PendingWriteFuture} if enqueued, {@code null} if + * packet can go through. + */ + protected PendingWriteFuture enqueuePendingPacket(Buffer buffer) { + if (KexState.DONE.equals(kexState.get())) { + return null; + } + + byte[] bufData = buffer.array(); + int cmd = bufData[buffer.rpos()] & 0xFF; + if (cmd <= SshConstants.SSH_MSG_KEX_LAST) { + return null; + } + + String cmdName = SshConstants.getCommandMessageName(cmd); + PendingWriteFuture future; + int numPending; + synchronized (pendingPackets) { + if (KexState.DONE.equals(kexState.get())) { + return null; + } + + future = new PendingWriteFuture(cmdName, buffer); + pendingPackets.add(future); + numPending = pendingPackets.size(); + } + + if (log.isDebugEnabled()) { + if (numPending == 1) { + log.debug("enqueuePendingPacket({})[{}] Start flagging packets as pending until key exchange is done", this, cmdName); + } else { + log.debug("enqueuePendingPacket({})[{}] enqueued until key exchange is done (pending={})", this, cmdName, numPending); + + } + } + + return future; + } + // NOTE: must acquire encodeLock when calling this method protected Buffer resolveOutputPacket(Buffer buffer) throws IOException { Buffer ignoreBuf = null;