This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c4b83ee0cfffd7227576524abbdcfbad65125191
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Jan 15 09:27:17 2018 +0100

    CAMEL-12020: Fixed class cast exception in ftp polling consumer rollback.
---
 .../component/file/GenericFilePollingConsumer.java |  6 +-
 .../RemoteFilePollingConsumerPollStrategy.java     | 22 +++---
 .../FromFtpConsumerTemplateRollbackTest.java       | 79 ++++++++++++++++++++++
 .../file/remote/FromFtpConsumerTemplateTest.java   | 72 ++++++++++++++++++++
 4 files changed, 166 insertions(+), 13 deletions(-)

diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
index 34c78f1..c5f4001 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
@@ -136,7 +136,7 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
                     }
 
                     // mark we are polling which should also include the 
begin/poll/commit
-                    boolean begin = pollStrategy.begin(this, getEndpoint());
+                    boolean begin = pollStrategy.begin(getConsumer(), 
getEndpoint());
                     if (begin) {
                         retryCounter++;
                         polledMessages = getConsumer().poll();
@@ -150,7 +150,7 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
                             done = false;
                         }
 
-                        pollStrategy.commit(this, getEndpoint(), 
polledMessages);
+                        pollStrategy.commit(getConsumer(), getEndpoint(), 
polledMessages);
                     } else {
                         LOG.debug("Cannot begin polling as pollStrategy 
returned false: {}", pollStrategy);
                     }
@@ -159,7 +159,7 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
                 LOG.trace("Finished polling: {}", this.getEndpoint());
             } catch (Exception e) {
                 try {
-                    boolean retry = pollStrategy.rollback(this, getEndpoint(), 
retryCounter, e);
+                    boolean retry = pollStrategy.rollback(getConsumer(), 
getEndpoint(), retryCounter, e);
                     if (retry) {
                         // do not set cause as we retry
                         done = false;
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
index c43a7cc..fea8be2 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java
@@ -29,17 +29,19 @@ public class RemoteFilePollingConsumerPollStrategy extends 
DefaultPollingConsume
 
     @Override
     public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception e) throws Exception {
-        RemoteFileConsumer<?> rfc = (RemoteFileConsumer<?>) consumer;
+        if (consumer instanceof RemoteFileConsumer) {
+            RemoteFileConsumer<?> rfc = (RemoteFileConsumer<?>) consumer;
 
-        // only try to recover if we are allowed to run
-        if (((RemoteFileConsumer<?>) consumer).isRunAllowed()) {
-            // disconnect from the server to force it to re login at next poll 
to recover
-            log.warn("Trying to recover by disconnecting from remote server 
forcing a re-connect at next poll: " + rfc.remoteServer());
-            try {
-                rfc.disconnect();
-            } catch (Throwable t) {
-                // ignore the exception
-                log.debug("Error occurred during disconnect from: " + 
rfc.remoteServer() + ". This exception will be ignored.", t);
+            // only try to recover if we are allowed to run
+            if (rfc.isRunAllowed()) {
+                // disconnect from the server to force it to re login at next 
poll to recover
+                log.warn("Trying to recover by disconnecting from remote 
server forcing a re-connect at next poll: " + rfc.remoteServer());
+                try {
+                    rfc.disconnect();
+                } catch (Throwable t) {
+                    // ignore the exception
+                    log.debug("Error occurred during disconnect from: " + 
rfc.remoteServer() + ". This exception will be ignored.", t);
+                }
             }
         }
 
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateRollbackTest.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateRollbackTest.java
new file mode 100644
index 0000000..b0a88db
--- /dev/null
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateRollbackTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FromFtpConsumerTemplateRollbackTest extends FtpServerTestSupport {
+
+    protected String getFtpUrl() {
+        return "ftp://admin@localhost:"; + getPort() + 
"/deletefile?password=admin&binary=false&delete=true";
+    }
+
+    protected String getFtpUrlInvalid() {
+        // use invalid starting directory and do not allow creating it so we 
force the poll to fail
+        return "ftp://admin@localhost:"; + getPort() + 
"/unknown?password=admin&binary=false&delete=true&autoCreate=false";
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        prepareFtpServer();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testConsumerTemplateRollback() throws Exception {
+        try {
+            consumer.receiveBody(getFtpUrlInvalid(), 2000, String.class);
+            fail("Should fail and rollback");
+        } catch (Exception e) {
+            GenericFileOperationFailedException ge = 
assertIsInstanceOf(GenericFileOperationFailedException.class, e);
+            assertEquals(550, ge.getCode());
+        }
+    }
+
+    private void prepareFtpServer() throws Exception {
+        // prepares the FTP Server by creating a file on the server that we 
want to unit
+        // test that we can pool and store as a local file
+        Endpoint endpoint = context.getEndpoint(getFtpUrl());
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World this file will be deleted");
+        exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+
+        // assert file is created
+        File file = new File(FTP_ROOT_DIR + "/deletefile/hello.txt");
+        assertTrue("The file should exists", file.exists());
+    }
+
+}
\ No newline at end of file
diff --git 
a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateTest.java
 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateTest.java
new file mode 100644
index 0000000..8811fe8
--- /dev/null
+++ 
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FromFtpConsumerTemplateTest extends FtpServerTestSupport {
+
+    protected String getFtpUrl() {
+        return "ftp://admin@localhost:"; + getPort() + 
"/deletefile?password=admin&binary=false&delete=true";
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        prepareFtpServer();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testConsumerTemplate() throws Exception {
+        String body = consumer.receiveBody(getFtpUrl(), 2000, String.class);
+        assertEquals("Hello World this file will be deleted", body);
+
+        // assert the file is deleted
+        File file = new File(FTP_ROOT_DIR + "/deletefile/hello.txt");
+        assertFalse("The file should have been deleted", file.exists());
+    }
+
+    private void prepareFtpServer() throws Exception {
+        // prepares the FTP Server by creating a file on the server that we 
want to unit
+        // test that we can pool and store as a local file
+        Endpoint endpoint = context.getEndpoint(getFtpUrl());
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody("Hello World this file will be deleted");
+        exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+        producer.stop();
+
+        // assert file is created
+        File file = new File(FTP_ROOT_DIR + "/deletefile/hello.txt");
+        assertTrue("The file should exists", file.exists());
+    }
+
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <commits@camel.apache.org>.

Reply via email to