This is an automated email from the ASF dual-hosted git repository. twolf pushed a commit to branch dev_3.0 in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit d99b07b5570f845af2b7e317666d69bafcc1269a Author: Thomas Wolf <tw...@apache.org> AuthorDate: Tue Apr 1 23:30:48 2025 +0200 Re-implement the filter chain as a doubly linked list This simplifies managing the list. The list nodes are FilterContexts, not the filters themselves, which keeps concerns separate and gives a nicer way to forward requests between filters on the chain. --- .../sshd/common/filter/DefaultFilterChain.java | 171 +++++++++------------ .../java/org/apache/sshd/common/filter/Filter.java | 4 +- .../org/apache/sshd/common/filter/FilterChain.java | 23 +-- .../{FilterChain.java => FilterContext.java} | 63 ++++---- .../org/apache/sshd/common/filter/IoFilter.java | 16 +- .../common/session/filters/CompressionFilter.java | 4 +- .../sshd/common/session/filters/CryptFilter.java | 4 +- .../common/session/filters/DelayKexInitFilter.java | 41 +++-- .../sshd/common/session/filters/IdentFilter.java | 27 +--- .../common/session/filters/InjectIgnoreFilter.java | 4 +- .../sshd/common/session/filters/kex/KexFilter.java | 8 +- .../common/session/helpers/AbstractSession.java | 8 +- .../common/session/filters/FilterTestSupport.java | 4 +- 13 files changed, 162 insertions(+), 215 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java b/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java index 9050d0f2e..1a18587fc 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/DefaultFilterChain.java @@ -19,9 +19,7 @@ package org.apache.sshd.common.filter; import java.io.IOException; -import java.util.NoSuchElementException; import java.util.Objects; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.util.Readable; @@ -32,147 +30,122 @@ import org.apache.sshd.common.util.buffer.Buffer; */ public class DefaultFilterChain implements FilterChain { - private final CopyOnWriteArrayList<Filter> chain = new CopyOnWriteArrayList<>(); + private FilterContext head; + + private FilterContext tail; public DefaultFilterChain() { super(); } - private Filter notDuplicate(Filter filter) { - if (chain.indexOf(Objects.requireNonNull(filter)) >= 0) { - throw new IllegalStateException("Duplicate filter " + filter); - } - return filter; - } - - private void addAt(int i, Filter filter) { - if (i < 0) { - throw new NoSuchElementException(); - } - notDuplicate(filter).adding(this); - chain.add(i, filter); - filter.added(this); - } - @Override public boolean isEmpty() { - return chain.isEmpty(); - } - - @Override - public void addFirst(Filter filter) { - addAt(0, filter); - } - - @Override - public void addLast(Filter filter) { - addAt(chain.size(), filter); + return head == null; } @Override - public void addBefore(Filter toAdd, Filter before) { - addAt(chain.indexOf(Objects.requireNonNull(before)), notDuplicate(toAdd)); - } - - @Override - public void addAfter(Filter toAdd, Filter after) { - addAt(chain.indexOf(Objects.requireNonNull(after)) + 1, notDuplicate(toAdd)); - } - - @Override - public void remove(Filter filter) { - int i = chain.indexOf(Objects.requireNonNull(filter)); - if (i < 0) { - throw new IllegalArgumentException("Filter not in filter chain " + filter); + public synchronized FilterContext addFirst(Filter filter) { + FilterContext ctx = new FilterContext(this, filter); + filter.adding(ctx); + ctx.prev = null; + ctx.next = head; + if (head != null) { + ctx.next.prev = ctx; + } + head = ctx; + if (tail == null) { + tail = ctx; } - filter.removing(); - chain.remove(filter); - filter.removed(this); + filter.added(ctx); + return ctx; } @Override - public void replace(Filter oldFilter, Filter newFilter) { - if (oldFilter.equals(Objects.requireNonNull(newFilter))) { - return; + public synchronized FilterContext addLast(Filter filter) { + FilterContext ctx = new FilterContext(this, filter); + filter.adding(ctx); + ctx.next = null; + ctx.prev = tail; + if (tail != null) { + ctx.prev.next = ctx; } - int i = chain.indexOf(oldFilter); - if (i < 0) { - throw new IllegalArgumentException("Filter not in filter chain " + oldFilter); + tail = ctx; + if (head == null) { + head = ctx; } - oldFilter.removing(); - chain.remove(i); - oldFilter.removed(this); - newFilter.adding(this); - chain.add(i, newFilter); - newFilter.added(this); + filter.added(ctx); + return ctx; } @Override - public Filter getFirst() { - return chain.isEmpty() ? null : chain.get(0); + public synchronized FilterContext addBefore(Filter filter, FilterContext before) { + Objects.requireNonNull(before); + FilterContext ctx = new FilterContext(this, filter); + filter.adding(ctx); + ctx.next = before; + ctx.prev = before.prev; + before.prev = ctx; + if (ctx.prev == null) { + head = ctx; + } else { + ctx.prev.next = ctx; + } + filter.added(ctx); + return ctx; } @Override - public Filter getLast() { - int i = chain.size(); - if (i == 0) { - return null; + public synchronized FilterContext addAfter(Filter filter, FilterContext after) { + Objects.requireNonNull(after); + FilterContext ctx = new FilterContext(this, filter); + filter.adding(ctx); + ctx.prev = after; + ctx.next = after.next; + after.next = ctx; + if (ctx.next == null) { + tail = ctx; + } else { + ctx.next.prev = ctx; } - return chain.get(i - 1); + filter.added(ctx); + return ctx; } @Override - public Filter getNext(Filter from) { - int i = chain.indexOf(from); - if (i < 0) { - throw new IllegalArgumentException("Filter not in filter chain: " + from); - } - if (i == chain.size() - 1) { - return null; - } - return chain.get(i + 1); + public synchronized Filter getFirst() { + return head == null ? null : head.filter; } @Override - public Filter getPrevious(Filter from) { - int i = chain.indexOf(from); - if (i < 0) { - throw new IllegalArgumentException("Filter not in filter chain: " + from); - } - return i == 0 ? null : chain.get(i - 1); + public synchronized Filter getLast() { + return tail == null ? null : tail.filter; } @Override - public IoWriteFuture send(Filter current, Buffer message) throws IOException { - int i = chain.indexOf(current); - if (i < 0) { - throw new IllegalArgumentException("Filter not in filter chain: " + current); - } - for (int j = i - 1; j >= 0; j--) { - Filter f = chain.get(j); - OutputHandler handler = f.out(); + public IoWriteFuture send(FilterContext current, Buffer message) throws IOException { + FilterContext ctx = current.prev; + while (ctx != null) { + OutputHandler handler = ctx.filter.out(); if (handler != null) { return handler.send(message); } + ctx = ctx.prev; } - throw new IllegalStateException("Fell off filter chain in send from " + current); + throw new IllegalStateException("Fell off filter chain in send from " + current.filter); } @Override - public void passOn(Filter current, Readable message) throws Exception { - int i = chain.indexOf(current); - if (i < 0) { - throw new IllegalArgumentException("Filter not in filter chain: " + current); - } - for (int j = i + 1; j < chain.size(); j++) { - Filter f = chain.get(j); - InputHandler handler = f.in(); + public void passOn(FilterContext current, Readable message) throws Exception { + FilterContext ctx = current.next; + while (ctx != null) { + InputHandler handler = ctx.filter.in(); if (handler != null) { handler.received(message); return; } + ctx = ctx.next; } - throw new IllegalStateException("Unhandled message: fell off filter chain in receive after " + current); + throw new IllegalStateException("Unhandled message: fell off filter chain in receive after " + current.filter); } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java b/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java index 8b6897d15..e9bfb15b4 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/Filter.java @@ -19,9 +19,9 @@ package org.apache.sshd.common.filter; /** - * A message filter belonging to a {@link FilterChain}. + * A message filter belonging to a {@link FilterContext} of a {@link FilterChain}. */ -public interface Filter extends Owned<FilterChain> { +public interface Filter extends Owned<FilterContext> { /** * Retrieves the filter's {@link InputHandler}. diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java b/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java index 74ec4c464..8adf24b70 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java @@ -21,12 +21,11 @@ package org.apache.sshd.common.filter; import java.io.IOException; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.session.helpers.AbstractSession; import org.apache.sshd.common.util.Readable; import org.apache.sshd.common.util.buffer.Buffer; /** - * A general chain of {@link Filter}s owned by an {@link AbstractSession}. + * A general chain of {@link Filter}s. */ public interface FilterChain { @@ -37,31 +36,23 @@ public interface FilterChain { * * @param filter to add */ - void addFirst(Filter filter); + FilterContext addFirst(Filter filter); /** * Adds the given filter at the end of the filter chain. * * @param filter to add */ - void addLast(Filter filter); + FilterContext addLast(Filter filter); - void addBefore(Filter toAdd, Filter before); + FilterContext addBefore(Filter toAdd, FilterContext before); - void addAfter(Filter toAdd, Filter after); - - void remove(Filter filter); - - void replace(Filter oldFilter, Filter newFilter); + FilterContext addAfter(Filter toAdd, FilterContext after); Filter getFirst(); Filter getLast(); - Filter getNext(Filter from); - - Filter getPrevious(Filter from); - /** * Pass on an outgoing message to the next filter before {@code current} that has an {@link OutputHandler}. * @@ -70,7 +61,7 @@ public interface FilterChain { * @return an {@link IoWriteFuture} that is fulfilled when the message has been sent. * @throws IOException if an error occurs */ - IoWriteFuture send(Filter current, Buffer message) throws IOException; + IoWriteFuture send(FilterContext current, Buffer message) throws IOException; /** * Pass on an incoming message to the next filter after {@code current} that has an {@link InputHandler}. @@ -79,5 +70,5 @@ public interface FilterChain { * @param message being passed on * @throws Exception if an error occurs */ - void passOn(Filter current, Readable message) throws Exception; + void passOn(FilterContext current, Readable message) throws Exception; } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java b/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterContext.java similarity index 51% copy from sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java copy to sshd-core/src/main/java/org/apache/sshd/common/filter/FilterContext.java index 74ec4c464..7f30ae0b9 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterChain.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/FilterContext.java @@ -19,65 +19,54 @@ package org.apache.sshd.common.filter; import java.io.IOException; +import java.util.Objects; import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.session.helpers.AbstractSession; import org.apache.sshd.common.util.Readable; import org.apache.sshd.common.util.buffer.Buffer; -/** - * A general chain of {@link Filter}s owned by an {@link AbstractSession}. - */ -public interface FilterChain { - - boolean isEmpty(); +public final class FilterContext { - /** - * Adds the given filter at the front of the filter chain. - * - * @param filter to add - */ - void addFirst(Filter filter); + volatile FilterContext prev; - /** - * Adds the given filter at the end of the filter chain. - * - * @param filter to add - */ - void addLast(Filter filter); + volatile FilterContext next; - void addBefore(Filter toAdd, Filter before); + final Filter filter; - void addAfter(Filter toAdd, Filter after); + private final FilterChain chain; - void remove(Filter filter); + FilterContext(FilterChain chain, Filter filter) { + this.chain = Objects.requireNonNull(chain); + this.filter = Objects.requireNonNull(filter); + } - void replace(Filter oldFilter, Filter newFilter); - - Filter getFirst(); - - Filter getLast(); - - Filter getNext(Filter from); - - Filter getPrevious(Filter from); + /** + * Retrieves the {@link FilterChain} containing this context. + * + * @return the {@link FilterChain} + */ + public FilterChain chain() { + return chain; + } /** - * Pass on an outgoing message to the next filter before {@code current} that has an {@link OutputHandler}. + * Pass on an outgoing message to the next filter before this one that has an {@link OutputHandler}. * - * @param current {@link Filter} that is passing on the message * @param message being passed on * @return an {@link IoWriteFuture} that is fulfilled when the message has been sent. * @throws IOException if an error occurs */ - IoWriteFuture send(Filter current, Buffer message) throws IOException; + public IoWriteFuture send(Buffer message) throws IOException { + return chain.send(this, message); + } /** - * Pass on an incoming message to the next filter after {@code current} that has an {@link InputHandler}. + * Pass on an incoming message to the next filter after this one that has an {@link InputHandler}. * - * @param current {@link Filter} that is passing on the message * @param message being passed on * @throws Exception if an error occurs */ - void passOn(Filter current, Readable message) throws Exception; + public void passOn(Readable message) throws Exception { + chain.passOn(this, message); + } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java index a9d40ec8f..f3a4f5626 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/filter/IoFilter.java @@ -25,7 +25,7 @@ import java.util.Objects; */ public abstract class IoFilter implements Filter { - private volatile FilterChain chain; + private volatile FilterContext ctx; protected IoFilter() { super(); @@ -37,12 +37,12 @@ public abstract class IoFilter implements Filter { } @Override - public void adding(FilterChain chain) { - this.chain = Objects.requireNonNull(chain); + public void adding(FilterContext context) { + this.ctx = Objects.requireNonNull(context); } @Override - public void added(FilterChain chain) { + public void added(FilterContext context) { // Nothing } @@ -52,13 +52,13 @@ public abstract class IoFilter implements Filter { } @Override - public void removed(FilterChain chain) { - this.chain = null; + public void removed(FilterContext context) { + this.ctx = null; } @Override - public FilterChain owner() { - return chain; + public FilterContext owner() { + return ctx; } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CompressionFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CompressionFilter.java index 1b52bc3f9..35efb3856 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CompressionFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CompressionFilter.java @@ -125,7 +125,7 @@ public class CompressionFilter extends IoFilter { if (comp != null && comp.isCompressionExecuted() && (delayedEnable.get() || !comp.isDelayed())) { decompressed = decompress(comp, message); } - owner().passOn(CompressionFilter.this, decompressed); + owner().passOn(decompressed); } private Buffer decompress(Compression comp, Buffer message) throws IOException { @@ -159,7 +159,7 @@ public class CompressionFilter extends IoFilter { } } } - return owner().send(CompressionFilter.this, message); + return owner().send(message); } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java index 5915a00ef..7dd7404cc 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/CryptFilter.java @@ -345,7 +345,7 @@ public class CryptFilter extends IoFilter implements CryptStatisticsProvider { } // Pass it on (directly the slice of this buffer) buffer.wpos(endOfPayload); - owner().passOn(CryptFilter.this, buffer); + owner().passOn(buffer); // Reset buffer positions buffer.rpos(afterPacket); @@ -386,7 +386,7 @@ public class CryptFilter extends IoFilter implements CryptStatisticsProvider { throw new IOException(e.getMessage(), e); } } - return owner().send(CryptFilter.this, encrypted); + return owner().send(encrypted); } private Buffer encode(Buffer packet) throws Exception { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/DelayKexInitFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/DelayKexInitFilter.java index 2999ed5b3..ffe254f80 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/DelayKexInitFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/DelayKexInitFilter.java @@ -21,6 +21,7 @@ package org.apache.sshd.common.session.filters; import java.io.IOException; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.filter.BufferInputHandler; @@ -34,34 +35,34 @@ import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.core.CoreModuleProperties; /** - * A filter implementing the KEX protocol. + * A filter implementing "delayed KEX-INIT" where the client waits for the server's initial KEX-INIT to arrive first + * before sending its own. */ public class DelayKexInitFilter extends IoFilter { - // Currently we just handle the "delayed KEX-INIT", where the client waits for the server's KEX to arrive first. - private AtomicBoolean isFirst = new AtomicBoolean(true); private final DefaultIoWriteFuture initReceived = new DefaultIoWriteFuture(this, null); - private final InputHandler input = new KexInputHandler(); + private final AtomicReference<InputHandler> input = new AtomicReference<>(); - private final OutputHandler output = new KexOutputHandler(); + private final AtomicReference<OutputHandler> output = new AtomicReference<>(); private Session session; public DelayKexInitFilter() { - super(); + input.set(new KexInputHandler()); + output.set(new KexOutputHandler()); } @Override public InputHandler in() { - return input; + return input.get(); } @Override public OutputHandler out() { - return output; + return output.get(); } public void setSession(Session session) { @@ -76,16 +77,18 @@ public class DelayKexInitFilter extends IoFilter { @Override public void handleMessage(Buffer message) throws Exception { - int cmd = message.rawByte(message.rpos()); - if (cmd == SshConstants.SSH_MSG_KEXINIT) { - initReceived.setValue(Boolean.TRUE); + if (input.get() != null) { + int cmd = message.rawByte(message.rpos()); + if (cmd == SshConstants.SSH_MSG_KEXINIT) { + initReceived.setValue(Boolean.TRUE); + input.set(null); + } } - owner().passOn(DelayKexInitFilter.this, message); + owner().passOn(message); } } - // TODO lastWrite? private class KexOutputHandler implements OutputHandler { KexOutputHandler() { @@ -94,17 +97,20 @@ public class DelayKexInitFilter extends IoFilter { @Override public IoWriteFuture send(Buffer message) throws IOException { + if (output.get() == null) { + return owner().send(message); + } int cmd = message.rawByte(message.rpos()); if (cmd != SshConstants.SSH_MSG_KEXINIT) { - return owner().send(DelayKexInitFilter.this, message); + return owner().send(message); } boolean first = isFirst.getAndSet(false); if (!first || session.isServerSession() || CoreModuleProperties.SEND_IMMEDIATE_KEXINIT.getRequired(session).booleanValue()) { - return owner().send(DelayKexInitFilter.this, message); + return owner().send(message).addListener(f -> output.set(null)); } // We're a client, and we delay sending the initial KEX-INIT until we have received the peer's KEX-INIT - IoWriteFuture initial = owner().send(DelayKexInitFilter.this, null); + IoWriteFuture initial = owner().send(null); DefaultIoWriteFuture result = new DefaultIoWriteFuture(KexOutputHandler.this, null); initial.addListener(init -> { Throwable t = init.getException(); @@ -114,7 +120,8 @@ public class DelayKexInitFilter extends IoFilter { } initReceived.addListener(f -> { try { - owner().send(DelayKexInitFilter.this, message).addListener(g -> { + owner().send(message).addListener(g -> { + output.set(null); result.setValue(g.isWritten() ? Boolean.TRUE : g.getException()); }); } catch (IOException e) { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/IdentFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/IdentFilter.java index 81794e752..ecf54b226 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/IdentFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/IdentFilter.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.sshd.common.PropertyResolver; -import org.apache.sshd.common.filter.FilterChain; import org.apache.sshd.common.filter.InputHandler; import org.apache.sshd.common.filter.IoFilter; import org.apache.sshd.common.filter.OutputHandler; @@ -61,12 +60,6 @@ public class IdentFilter extends IoFilter { private final CopyOnWriteArrayList<IdentListener> listeners = new CopyOnWriteArrayList<>(); public IdentFilter() { - super(); - } - - @Override - public void adding(FilterChain chain) { - super.adding(chain); readHandler.set(new ReadHandler()); writeHandler.set(new WriteHandler()); } @@ -119,7 +112,7 @@ public class IdentFilter extends IoFilter { if (haveIdent) { // Something called IdentFilter.in() and got this ReadHandler before we could set it to null below. // Just pass on the message. - owner().passOn(IdentFilter.this, message); + owner().passOn(message); } else { buffer.putBuffer(message); List<String> lines = identHandler.readIdentification(buffer); @@ -129,12 +122,9 @@ public class IdentFilter extends IoFilter { buffer.compact(); received.setValue(Boolean.TRUE); if (buffer.available() > 0) { - owner().passOn(IdentFilter.this, buffer); + owner().passOn(buffer); } readHandler.set(null); - if (writeHandler.get() == null) { - owner().remove(IdentFilter.this); - } } } } @@ -172,14 +162,14 @@ public class IdentFilter extends IoFilter { IoWriteFuture identSent; if (identHandler.isServer() || CoreModuleProperties.SEND_IMMEDIATE_IDENTIFICATION.getRequired(properties).booleanValue()) { - identSent = owner().send(IdentFilter.this, getIdent()); + identSent = owner().send(getIdent()); } else { // We're a client, and we wait for the server's ident to arrive first. DefaultIoWriteFuture delayed = new DefaultIoWriteFuture("DelayedIdent", null); identSent = delayed; received.addListener(identReceived -> { try { - owner().send(IdentFilter.this, getIdent()).addListener(idSent -> { + owner().send(getIdent()).addListener(idSent -> { delayed.setValue(idSent.isWritten() ? Boolean.TRUE : idSent.getException()); }); } catch (IOException e) { @@ -198,11 +188,8 @@ public class IdentFilter extends IoFilter { IoWriteFuture queue = lastWrite.get(); if (queue == null || queue.isDone()) { lastWrite.set(null); - IoWriteFuture result = owner().send(IdentFilter.this, message); + IoWriteFuture result = owner().send(message); writeHandler.set(null); - if (readHandler.get() == null) { - owner().remove(IdentFilter.this); - } return result; } DefaultIoWriteFuture result = new DefaultIoWriteFuture("Ident", null); @@ -211,7 +198,7 @@ public class IdentFilter extends IoFilter { lastWrite.compareAndSet(result, null); if (f.isWritten()) { try { - owner().send(IdentFilter.this, message).addListener(msgSent -> { + owner().send(message).addListener(msgSent -> { result.setValue(msgSent.isWritten() ? Boolean.TRUE : msgSent.getException()); }); } catch (IOException e) { @@ -231,7 +218,7 @@ public class IdentFilter extends IoFilter { } listeners.forEach(listener -> listener.ident(false, ident.get(ident.size() - 1))); String myIdentification = ident.stream().collect(Collectors.joining(CRLF)) + CRLF; - return new ByteArrayBuffer((myIdentification).getBytes(StandardCharsets.UTF_8)); + return new ByteArrayBuffer(myIdentification.getBytes(StandardCharsets.UTF_8)); } } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java index 9f49e46e3..f8bc7110b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/InjectIgnoreFilter.java @@ -84,14 +84,14 @@ public class InjectIgnoreFilter extends IoFilter { if (LOG.isDebugEnabled()) { LOG.debug("Injector.send({}) injecting SSH_MSG_IGNORE", resolver); } - owner().send(InjectIgnoreFilter.this, createIgnoreBuffer(length)).addListener(f -> { + owner().send(createIgnoreBuffer(length)).addListener(f -> { Throwable t = f.getException(); if (t != null && (resolver instanceof Session)) { ((Session) resolver).exceptionCaught(t); } }); } - return owner().send(InjectIgnoreFilter.this, message); + return owner().send(message); } private Settings getSettings() { diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java index 18ecda46a..19f46c657 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/filters/kex/KexFilter.java @@ -1137,7 +1137,7 @@ public class KexFilter extends IoFilter { if (isKexNeeded(false)) { startKex(); } - owner().passOn(KexFilter.this, message); + owner().passOn(message); } return; } @@ -1170,7 +1170,7 @@ public class KexFilter extends IoFilter { throw new SshException(SshConstants.SSH2_DISCONNECT_KEY_EXCHANGE_FAILED, MessageFormat .format("{0} not allowed during key exchange", SshConstants.getCommandMessageName(cmd))); } - owner().passOn(KexFilter.this, message); + owner().passOn(message); } } @@ -1180,7 +1180,7 @@ public class KexFilter extends IoFilter { private void passOnBeforeKexInit(int cmd, Buffer message) throws Exception { // TODO: message handling per the class javadoc. - owner().passOn(KexFilter.this, message); + owner().passOn(message); } private void handleKexMessage(KexState state, int cmd, Buffer message) throws Exception { @@ -1231,7 +1231,7 @@ public class KexFilter extends IoFilter { LOG.debug("KexFilter.send({}) {} with packet size {}", getSession(), SshConstants.getCommandMessageName(message.rawByte(message.rpos()) & 0xFF), message.available()); } - return owner().send(KexFilter.this, message); + return owner().send(message); } } } diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 34f721871..b3f860a3b 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -230,7 +230,7 @@ public abstract class AbstractSession extends SessionHelper { @Override public InputHandler in() { - return message -> owner().passOn(this, message); + return owner()::passOn; } @Override @@ -260,7 +260,7 @@ public abstract class AbstractSession extends SessionHelper { @Override public OutputHandler out() { - return message -> owner().send(this, message); + return owner()::send; } }; filters.addLast(sessionConnector); @@ -336,12 +336,12 @@ public abstract class AbstractSession extends SessionHelper { compressionFilter.setSession(this); filters.addLast(compressionFilter); - filters.addLast(new InjectIgnoreFilter(this, random)); - DelayKexInitFilter delayKexFilter = new DelayKexInitFilter(); delayKexFilter.setSession(this); filters.addLast(delayKexFilter); + filters.addLast(new InjectIgnoreFilter(this, random)); + kexFilter = new KexFilter(this, random, cryptFilter, compressionFilter, new SessionListener() { @Override diff --git a/sshd-core/src/test/java/org/apache/sshd/common/session/filters/FilterTestSupport.java b/sshd-core/src/test/java/org/apache/sshd/common/session/filters/FilterTestSupport.java index d3402ea17..c67e8ca85 100644 --- a/sshd-core/src/test/java/org/apache/sshd/common/session/filters/FilterTestSupport.java +++ b/sshd-core/src/test/java/org/apache/sshd/common/session/filters/FilterTestSupport.java @@ -63,7 +63,7 @@ abstract class FilterTestSupport extends JUnitTestSupport { @Override public InputHandler in() { - return buf -> owner().passOn(this, buf); + return owner()::passOn; } @Override @@ -96,7 +96,7 @@ abstract class FilterTestSupport extends JUnitTestSupport { @Override public OutputHandler out() { - return buf -> owner().send(this, buf); + return owner()::send; } }