[CAMEL-8718] Prevent FTP(s) connection leak after failed auth (producer&consumer)
(cherry picked from commit 8c6dce78c11c91d070ba1894c0a4d71336ad4e03) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f44084b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f44084b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f44084b Branch: refs/heads/camel-2.14.x Commit: 3f44084b38030e2fb6bd6bf399e1afedc4382d5a Parents: ff476a3 Author: Grzegorz Grzybek <gr.grzy...@gmail.com> Authored: Thu May 7 20:08:35 2015 +0200 Committer: Grzegorz Grzybek <gr.grzy...@gmail.com> Committed: Thu May 7 20:10:19 2015 +0200 ---------------------------------------------------------------------- .../component/file/remote/FtpOperations.java | 2 + .../file/remote/RemoteFileConsumer.java | 11 -- .../remote/FtpBadLoginConnectionLeakTest.java | 18 +-- ...FtpBadLoginInProducerConnectionLeakTest.java | 124 +++++++++++++++ .../FtpBadLoginMockNoopConnectionLeakTest.java | 152 +++++++++++++++++++ 5 files changed, 285 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java index b5384ef..bcca1a0 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java @@ -163,6 +163,8 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { } log.trace("User {} logged in: {}", username != null ? username : "anonymous", login); if (!login) { + // disconnect to prevent connection leaks + client.disconnect(); throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString()); } client.setFileType(configuration.isBinary() ? FTP.BINARY_FILE_TYPE : FTP.ASCII_FILE_TYPE); http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index 9408ea4..df15b55 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -160,17 +160,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { } } - try { - // we may as well be connected, but not logged in. let's disconnect to prevent connection leak - if (!isConnected && getOperations().isConnected()) { - getOperations().disconnect(); - } - } catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("Exception during disconnect: " + ex.getMessage()); - } - } - if (!loggedIn || !isConnected) { if (log.isDebugEnabled()) { log.debug("Not connected/logged in, connecting to: {}", remoteServer()); http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java index 18eb877..13c8a16 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginConnectionLeakTest.java @@ -22,7 +22,6 @@ import java.net.Socket; import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import org.apache.camel.builder.RouteBuilder; @@ -32,24 +31,20 @@ import org.junit.Test; public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport { - private final AtomicInteger exceptionCount = new AtomicInteger(0); - - private String getFtpUrl() { - return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=0" + - "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf"; - } - /** * Mapping of socket hashcode to two element tab ([connect() called, close() called]) */ private Map<Integer, boolean[]> socketAudits = new HashMap<Integer, boolean[]>(); - @Override + private String getFtpUrl() { + return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber" + + "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf"; + } + @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry jndi = super.createRegistry(); - final SocketFactory defaultSocketFactory = SocketFactory.getDefault(); SocketFactory sf = new AuditingSocketFactory(); jndi.bind("sf", sf); return jndi; @@ -67,12 +62,13 @@ public class FtpBadLoginConnectionLeakTest extends FtpServerTestSupport { for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) { assertTrue("Socket should be connected", socketStats.getValue()[0]); - assertEquals("Socket should be closed", socketStats.getValue()[1], socketStats.getValue()[0]); + assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]); } mock.assertIsSatisfied(); } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java new file mode 100644 index 0000000..e3314d0 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginInProducerConnectionLeakTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import javax.net.SocketFactory; + +import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class FtpBadLoginInProducerConnectionLeakTest extends FtpServerTestSupport { + + /** + * Mapping of socket hashcode to two element tab ([connect() called, close() called]) + */ + private Map<Integer, boolean[]> socketAudits = new HashMap<>(); + + private String getFtpUrl() { + return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" + + "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf"; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + SocketFactory sf = new AuditingSocketFactory(); + jndi.bind("sf", sf); + return jndi; + } + + @Test + public void testConnectionLeak() throws Exception { + for (String filename : new String[] { "claus.txt", "grzegorz.txt" }) { + try { + sendFile(getFtpUrl(), "Hello World", filename); + } catch (Exception ignored) { + // expected + } + } + + // maximumReconnectAttempts is related to TCP connects, not to FTP login attempts + // but having this parameter > 0 leads to two connection attempts + assertEquals("Expected 4 socket connections to be created", 4, socketAudits.size()); + + for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) { + assertTrue("Socket should be connected", socketStats.getValue()[0]); + assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]); + } + } + + /** + * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()} + * invocations + */ + private class AuditingSocketFactory extends SocketFactory { + + @Override + public Socket createSocket(String s, int i) throws IOException { + return null; + } + + @Override + public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException { + return null; + } + + @Override + public Socket createSocket(InetAddress inetAddress, int i) throws IOException { + return null; + } + + @Override + public Socket createSocket() throws IOException { + AuditingSocket socket = new AuditingSocket(); + socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false }); + return socket; + } + + @Override + public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException { + return null; + } + } + + /** + * {@link Socket} which counts connect()/close() invocations + */ + private class AuditingSocket extends Socket { + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + super.connect(endpoint, timeout); + socketAudits.get(System.identityHashCode(this))[0] = true; + } + + @Override + public synchronized void close() throws IOException { + super.close(); + socketAudits.get(System.identityHashCode(this))[1] = true; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3f44084b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java new file mode 100644 index 0000000..0b895f5 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpBadLoginMockNoopConnectionLeakTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import javax.net.SocketFactory; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.commons.net.ftp.FTPClient; +import org.junit.Test; + +/** + * Test which checks leaking connections when FTP server returns correct status for NOOP operation. + */ +public class FtpBadLoginMockNoopConnectionLeakTest extends FtpServerTestSupport { + + private String getFtpUrl() { + return "ftp://dummy@localhost:" + getPort() + "/badlogin?password=cantremeber&maximumReconnectAttempts=3" + + "&throwExceptionOnConnectFailed=false&ftpClient.socketFactory=#sf"; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + + FtpEndpoint<?> endpoint = context.getEndpoint(getFtpUrl(), FtpEndpoint.class); + endpoint.setFtpClient(new FTPClient() { + @Override + public boolean sendNoOp() throws IOException { + // return true as long as connection is established + return this.isConnected(); + } + }); + } + + /** + * Mapping of socket hashcode to two element tab ([connect() called, close() called]) + */ + private Map<Integer, boolean[]> socketAudits = new HashMap<>(); + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + SocketFactory sf = new AuditingSocketFactory(); + jndi.bind("sf", sf); + return jndi; + } + + @Test + public void testConnectionLeak() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + // let's have several login attempts + Thread.sleep(3000L); + + stopCamelContext(); + + for (Map.Entry<Integer, boolean[]> socketStats : socketAudits.entrySet()) { + assertTrue("Socket should be connected", socketStats.getValue()[0]); + assertEquals("Socket should be closed", socketStats.getValue()[0], socketStats.getValue()[1]); + } + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from(getFtpUrl()).to("mock:result"); + } + }; + } + + /** + * {@link SocketFactory} which creates {@link Socket}s that expose statistics about {@link Socket#connect(SocketAddress)}/{@link Socket#close()} + * invocations + */ + private class AuditingSocketFactory extends SocketFactory { + + @Override + public Socket createSocket(String s, int i) throws IOException { + return null; + } + + @Override + public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException { + return null; + } + + @Override + public Socket createSocket(InetAddress inetAddress, int i) throws IOException { + return null; + } + + @Override + public Socket createSocket() throws IOException { + AuditingSocket socket = new AuditingSocket(); + socketAudits.put(System.identityHashCode(socket), new boolean[] { false, false }); + return socket; + } + + @Override + public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException { + return null; + } + } + + /** + * {@link Socket} which counts connect()/close() invocations + */ + private class AuditingSocket extends Socket { + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + log.info("Connecting socket {}", System.identityHashCode(this)); + super.connect(endpoint, timeout); + socketAudits.get(System.identityHashCode(this))[0] = true; + } + + @Override + public synchronized void close() throws IOException { + log.info("Disconnecting socket {}", System.identityHashCode(this)); + super.close(); + socketAudits.get(System.identityHashCode(this))[1] = true; + } + } + +}