Author: davsclaus
Date: Sun Aug  8 09:06:41 2010
New Revision: 983366

URL: http://svn.apache.org/viewvc?rev=983366&view=rev
Log:
CAMEL-3023: ftp component detects interrupt during connection, to cater for 
task being cancelled or JVM shutdown interruptting.

Added:
    
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerRecipientListParallelTimeoutTest.java
      - copied, changed from r983215, 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerTempPrefixTest.java
Modified:
    
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
    
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
    
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
    camel/trunk/components/camel-ftp/src/test/resources/log4j.properties

Modified: 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=983366&r1=983365&r2=983366&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
 Sun Aug  8 09:06:41 2010
@@ -100,6 +100,11 @@ public class FtpOperations implements Re
                     throw new 
GenericFileOperationFailedException(client.getReplyCode(), 
client.getReplyString(), "Server refused connection");
                 }
             } catch (Exception e) {
+                // check if we are interrupted so we can break out
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new GenericFileOperationFailedException("Interrupted 
during connecting", new InterruptedException("Interrupted during connecting"));
+                }
+
                 GenericFileOperationFailedException failed;
                 if (e instanceof GenericFileOperationFailedException) {
                     failed = (GenericFileOperationFailedException) e;
@@ -118,7 +123,9 @@ public class FtpOperations implements Re
                     try {
                         Thread.sleep(endpoint.getReconnectDelay());
                     } catch (InterruptedException ie) {
-                        // ignore
+                        // we could potentially also be interrupted during 
sleep
+                        Thread.currentThread().interrupt();
+                        throw new 
GenericFileOperationFailedException("Interrupted during sleeping", ie);
                     }
                 }
             }

Modified: 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java?rev=983366&r1=983365&r2=983366&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
 Sun Aug  8 09:06:41 2010
@@ -21,6 +21,7 @@ import org.apache.camel.ServicePoolAware
 import org.apache.camel.component.file.GenericFileOperationFailedException;
 import org.apache.camel.component.file.GenericFileProducer;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Generic remote file producer for all the FTP variations.
@@ -162,10 +163,20 @@ public class RemoteFileProducer<T> exten
         try {
             connectIfNecessary();
         } catch (Exception e) {
+            loggedIn = false;
+
+            // are we interrupted
+            InterruptedException ie = 
ObjectHelper.getException(InterruptedException.class, e);
+            if (ie != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Interrupted during connect to: " + 
getEndpoint(), ie);
+                }
+                throw ie;
+            }
+
             if (log.isDebugEnabled()) {
                 log.debug("Could not connect to: " + getEndpoint() + ". Will 
try to recover.", e);
             }
-            loggedIn = false;
         }
 
         // recover by re-creating operations which should most likely be able 
to recover

Modified: 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java?rev=983366&r1=983365&r2=983366&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
 Sun Aug  8 09:06:41 2010
@@ -105,6 +105,11 @@ public class SftpOperations implements R
                 // yes we could connect
                 connected = true;
             } catch (Exception e) {
+                // check if we are interrupted so we can break out
+                if (Thread.currentThread().isInterrupted()) {
+                    throw new GenericFileOperationFailedException("Interrupted 
during connecting", new InterruptedException("Interrupted during connecting"));
+                }
+
                 GenericFileOperationFailedException failed = new 
GenericFileOperationFailedException("Cannot connect to " + 
configuration.remoteServerInformation(), e);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Cannot connect due: " + failed.getMessage());
@@ -116,8 +121,10 @@ public class SftpOperations implements R
                 if (endpoint.getReconnectDelay() > 0) {
                     try {
                         Thread.sleep(endpoint.getReconnectDelay());
-                    } catch (InterruptedException e1) {
-                        // ignore
+                    } catch (InterruptedException ie) {
+                        // we could potentially also be interrupted during 
sleep
+                        Thread.currentThread().interrupt();
+                        throw new 
GenericFileOperationFailedException("Interrupted during sleeping", ie);
                     }
                 }
             }

Copied: 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerRecipientListParallelTimeoutTest.java
 (from r983215, 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerTempPrefixTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerRecipientListParallelTimeoutTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerRecipientListParallelTimeoutTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerTempPrefixTest.java&r1=983215&r2=983366&rev=983366&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerTempPrefixTest.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerRecipientListParallelTimeoutTest.java
 Sun Aug  8 09:06:41 2010
@@ -16,32 +16,62 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.File;
-
-import org.apache.camel.converter.IOConverter;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * Unit test to verify that Camel can build remote directory on FTP server if 
missing (full or part of).
  */
-public class FtpProducerTempPrefixTest extends FtpServerTestSupport {
-
-    @Override
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
+...@ignore("Run this test manually")
+public class FtpProducerRecipientListParallelTimeoutTest extends 
FtpServerTestSupport {
 
     private String getFtpUrl() {
-        return "ftp://ad...@localhost:"; + getPort() + 
"/upload/user/claus?binary=false&password=admin&tempPrefix=.uploading";
+        return "ftp://admin:ad...@127.0.0.2:"; + (getPort() - 1) + "/timeout";
     }
 
     @Test
-    public void testProduceTempPrefixTest() throws Exception {
-        sendFile(getFtpUrl(), "Hello World", "claus.txt");
+    public void testRecipientListTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // B will timeout so we only get A and C
+        mock.expectedBodiesReceived("AC");
+
+        template.sendBodyAndHeader("direct:start", "Hello", "slip", 
"direct:a," + getFtpUrl() + ",direct:c");
 
-        File file = new File(FTP_ROOT_DIR + "upload/user/claus/claus.txt");
-        file = file.getAbsoluteFile();
-        assertTrue("The uploaded file should exists", file.exists());
-        assertEquals("Hello World", IOConverter.toString(file, null));
+        assertMockEndpointsSatisfied();
     }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.getShutdownStrategy().setTimeout(60);
+
+                from("direct:start")
+                    .recipientList(header("slip")).aggregationStrategy(
+                            new AggregationStrategy() {
+                            public Exchange aggregate(Exchange oldExchange, 
Exchange newExchange) {
+                                if (oldExchange == null) {
+                                    return newExchange;
+                                }
+
+                                String body = 
oldExchange.getIn().getBody(String.class);
+                                oldExchange.getIn().setBody(body + 
newExchange.getIn().getBody(String.class));
+                                return oldExchange;
+                            }
+                        })
+                        .parallelProcessing().timeout(2000)
+                    .to("mock:result");
+
+                from("direct:a").setBody(constant("A"));
+
+                from("direct:c").delay(500).setBody(constant("C"));
+            }
+        };
+    }
+
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties?rev=983366&r1=983365&r2=983366&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/resources/log4j.properties 
(original)
+++ camel/trunk/components/camel-ftp/src/test/resources/log4j.properties Sun 
Aug  8 09:06:41 2010
@@ -22,6 +22,7 @@ log4j.rootLogger=INFO, file
 
 # uncomment the following to enable camel debugging
 #log4j.logger.org.apache.camel.component.file=TRACE
+#log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
 #log4j.logger.org.apache.camel=DEBUG
 log4j.logger.org.apache.mina=WARN
 log4j.logger.org.apache.ftpserver=WARN


Reply via email to