CAMEL-8088: FTP can wait indefinitely when connection timeout occurs during 
connect. Thanks to Bob Browning for test.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/29dcb64f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/29dcb64f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/29dcb64f

Branch: refs/heads/camel-2.13.x
Commit: 29dcb64fd5001a80294ba9bbd0e2e4e4d683050e
Parents: e26a086
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Feb 15 12:23:22 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 15 12:23:51 2015 +0100

----------------------------------------------------------------------
 .../file/remote/RemoteFileProducer.java         |   4 +-
 tests/camel-itest/pom.xml                       |  13 ++
 .../itest/ftp/FtpInitialConnectTimeoutTest.java | 171 +++++++++++++++++++
 3 files changed, 186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/29dcb64f/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
index 87882aa..d1ed723 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
@@ -176,14 +176,14 @@ public class RemoteFileProducer<T> extends 
GenericFileProducer<T> implements Ser
 
         // recover by re-creating operations which should most likely be able 
to recover
         if (!loggedIn) {
-            log.debug("Trying to recover connection to: {} with a fresh 
client.", getEndpoint());
+            log.debug("Trying to recover connection to: {} with a new FTP 
client.", getEndpoint());
             setOperations(getEndpoint().createRemoteFileOperations());
             connectIfNecessary();
         }
     }
 
     protected void connectIfNecessary() throws 
GenericFileOperationFailedException {
-        if (!getOperations().isConnected()) {
+        if (!loggedIn || !getOperations().isConnected()) {
             log.debug("Not already connected/logged in. Connecting to: {}", 
getEndpoint());
             RemoteFileConfiguration config = getEndpoint().getConfiguration();
             loggedIn = getOperations().connect(config);

http://git-wip-us.apache.org/repos/asf/camel/blob/29dcb64f/tests/camel-itest/pom.xml
----------------------------------------------------------------------
diff --git a/tests/camel-itest/pom.xml b/tests/camel-itest/pom.xml
index c436690..15d17bf 100644
--- a/tests/camel-itest/pom.xml
+++ b/tests/camel-itest/pom.xml
@@ -218,6 +218,19 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- mocking ftp server -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockftpserver</groupId>
+      <artifactId>MockFtpServer</artifactId>
+      <version>2.5</version>
+      <scope>test</scope>
+    </dependency>
+
     <!-- some TX tests using iBatis -->
     <dependency>
       <groupId>org.apache.derby</groupId>

http://git-wip-us.apache.org/repos/asf/camel/blob/29dcb64f/tests/camel-itest/src/test/java/org/apache/camel/itest/ftp/FtpInitialConnectTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/tests/camel-itest/src/test/java/org/apache/camel/itest/ftp/FtpInitialConnectTimeoutTest.java
 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/ftp/FtpInitialConnectTimeoutTest.java
new file mode 100644
index 0000000..0d44c3b
--- /dev/null
+++ 
b/tests/camel-itest/src/test/java/org/apache/camel/itest/ftp/FtpInitialConnectTimeoutTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.ftp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.SocketFactory;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.net.ftp.FTPClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FtpInitialConnectTimeoutTest extends CamelTestSupport {
+
+    private static final int CONNECT_TIMEOUT = 11223;
+
+    /**
+     * Create the answer for the socket factory that causes a 
SocketTimeoutException to occur in connect.
+     */
+    private static class SocketAnswer implements Answer<Socket> {
+
+        @Override
+        public Socket answer(InvocationOnMock invocation) throws Throwable {
+            final Socket socket = Mockito.spy(new Socket());
+            final AtomicBoolean timeout = new AtomicBoolean();
+
+            try {
+                doAnswer(new Answer<InputStream>() {
+                    @Override
+                    public InputStream answer(InvocationOnMock invocation) 
throws Throwable {
+                        final InputStream stream = (InputStream) 
invocation.callRealMethod();
+
+                        InputStream inputStream = new InputStream() {
+                            @Override
+                            public int read() throws IOException {
+                                if (timeout.get()) {
+                                    // emulate a timeout occurring in 
_getReply()
+                                    throw new SocketTimeoutException();
+                                }
+                                return stream.read();
+                            }
+                        };
+
+                        return inputStream;
+                    }
+                }).when(socket).getInputStream();
+            } catch (IOException ignored) {
+            }
+
+            try {
+                doAnswer(new Answer() {
+                    @Override
+                    public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                        if ((Integer) invocation.getArguments()[0] == 
CONNECT_TIMEOUT) {
+                            // setting of connect timeout
+                            timeout.set(true);
+                        } else {
+                            // non-connect timeout
+                            timeout.set(false);
+                        }
+                        return invocation.callRealMethod();
+                    }
+                }).when(socket).setSoTimeout(anyInt());
+            } catch (SocketException e) {
+                throw new RuntimeException(e);
+            }
+            return socket;
+        }
+    }
+
+    private FakeFtpServer fakeFtpServer;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        fakeFtpServer = new FakeFtpServer();
+        fakeFtpServer.setServerControlPort(0);
+        fakeFtpServer.start();
+
+        super.setUp();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (fakeFtpServer != null) {
+            fakeFtpServer.stop();
+        }
+    }
+
+    private FTPClient mockedClient() throws IOException {
+        FTPClient client = new FTPClient();
+        client.setSocketFactory(createSocketFactory());
+        return client;
+    }
+
+    private SocketFactory createSocketFactory() throws IOException {
+        SocketFactory socketFactory = mock(SocketFactory.class);
+        when(socketFactory.createSocket()).thenAnswer(new SocketAnswer());
+        return socketFactory;
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("mocked", mockedClient());
+        return registry;
+    }
+
+    @Test
+    public void testReConnect() throws Exception {
+        // we should fail, but we are testing that we are not in a deadlock 
which could potentially happen
+        getMockEndpoint("mock:done").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        sendBody("direct:start", "test");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // using soTimeout=0 could potentially cause the ftp producer 
to dead-lock doing endless reconnection attempts
+                // this is a test to ensure we have fixed that
+                from("direct:start")
+                        .to("ftp://localhost:"; + 
fakeFtpServer.getServerControlPort()
+                                + "?ftpClient=#mocked"
+                                + "&soTimeout=0&"
+                                + "connectTimeout=" + CONNECT_TIMEOUT)
+                        .to("mock:done");
+            }
+        };
+    }
+}
\ No newline at end of file

Reply via email to