This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit c4b83ee0cfffd7227576524abbdcfbad65125191 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jan 15 09:27:17 2018 +0100 CAMEL-12020: Fixed class cast exception in ftp polling consumer rollback. --- .../component/file/GenericFilePollingConsumer.java | 6 +- .../RemoteFilePollingConsumerPollStrategy.java | 22 +++--- .../FromFtpConsumerTemplateRollbackTest.java | 79 ++++++++++++++++++++++ .../file/remote/FromFtpConsumerTemplateTest.java | 72 ++++++++++++++++++++ 4 files changed, 166 insertions(+), 13 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java index 34c78f1..c5f4001 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java @@ -136,7 +136,7 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { } // mark we are polling which should also include the begin/poll/commit - boolean begin = pollStrategy.begin(this, getEndpoint()); + boolean begin = pollStrategy.begin(getConsumer(), getEndpoint()); if (begin) { retryCounter++; polledMessages = getConsumer().poll(); @@ -150,7 +150,7 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { done = false; } - pollStrategy.commit(this, getEndpoint(), polledMessages); + pollStrategy.commit(getConsumer(), getEndpoint(), polledMessages); } else { LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); } @@ -159,7 +159,7 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { LOG.trace("Finished polling: {}", this.getEndpoint()); } catch (Exception e) { try { - boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e); + boolean retry = pollStrategy.rollback(getConsumer(), getEndpoint(), retryCounter, e); if (retry) { // do not set cause as we retry done = false; diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java index c43a7cc..fea8be2 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFilePollingConsumerPollStrategy.java @@ -29,17 +29,19 @@ public class RemoteFilePollingConsumerPollStrategy extends DefaultPollingConsume @Override public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { - RemoteFileConsumer<?> rfc = (RemoteFileConsumer<?>) consumer; + if (consumer instanceof RemoteFileConsumer) { + RemoteFileConsumer<?> rfc = (RemoteFileConsumer<?>) consumer; - // only try to recover if we are allowed to run - if (((RemoteFileConsumer<?>) consumer).isRunAllowed()) { - // disconnect from the server to force it to re login at next poll to recover - log.warn("Trying to recover by disconnecting from remote server forcing a re-connect at next poll: " + rfc.remoteServer()); - try { - rfc.disconnect(); - } catch (Throwable t) { - // ignore the exception - log.debug("Error occurred during disconnect from: " + rfc.remoteServer() + ". This exception will be ignored.", t); + // only try to recover if we are allowed to run + if (rfc.isRunAllowed()) { + // disconnect from the server to force it to re login at next poll to recover + log.warn("Trying to recover by disconnecting from remote server forcing a re-connect at next poll: " + rfc.remoteServer()); + try { + rfc.disconnect(); + } catch (Throwable t) { + // ignore the exception + log.debug("Error occurred during disconnect from: " + rfc.remoteServer() + ". This exception will be ignored.", t); + } } } diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateRollbackTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateRollbackTest.java new file mode 100644 index 0000000..b0a88db --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateRollbackTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.junit.Before; +import org.junit.Test; + +public class FromFtpConsumerTemplateRollbackTest extends FtpServerTestSupport { + + protected String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/deletefile?password=admin&binary=false&delete=true"; + } + + protected String getFtpUrlInvalid() { + // use invalid starting directory and do not allow creating it so we force the poll to fail + return "ftp://admin@localhost:" + getPort() + "/unknown?password=admin&binary=false&delete=true&autoCreate=false"; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + prepareFtpServer(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testConsumerTemplateRollback() throws Exception { + try { + consumer.receiveBody(getFtpUrlInvalid(), 2000, String.class); + fail("Should fail and rollback"); + } catch (Exception e) { + GenericFileOperationFailedException ge = assertIsInstanceOf(GenericFileOperationFailedException.class, e); + assertEquals(550, ge.getCode()); + } + } + + private void prepareFtpServer() throws Exception { + // prepares the FTP Server by creating a file on the server that we want to unit + // test that we can pool and store as a local file + Endpoint endpoint = context.getEndpoint(getFtpUrl()); + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody("Hello World this file will be deleted"); + exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt"); + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + producer.stop(); + + // assert file is created + File file = new File(FTP_ROOT_DIR + "/deletefile/hello.txt"); + assertTrue("The file should exists", file.exists()); + } + +} \ No newline at end of file diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateTest.java new file mode 100644 index 0000000..8811fe8 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpConsumerTemplateTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import java.io.File; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.junit.Before; +import org.junit.Test; + +public class FromFtpConsumerTemplateTest extends FtpServerTestSupport { + + protected String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/deletefile?password=admin&binary=false&delete=true"; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + prepareFtpServer(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testConsumerTemplate() throws Exception { + String body = consumer.receiveBody(getFtpUrl(), 2000, String.class); + assertEquals("Hello World this file will be deleted", body); + + // assert the file is deleted + File file = new File(FTP_ROOT_DIR + "/deletefile/hello.txt"); + assertFalse("The file should have been deleted", file.exists()); + } + + private void prepareFtpServer() throws Exception { + // prepares the FTP Server by creating a file on the server that we want to unit + // test that we can pool and store as a local file + Endpoint endpoint = context.getEndpoint(getFtpUrl()); + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody("Hello World this file will be deleted"); + exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt"); + Producer producer = endpoint.createProducer(); + producer.start(); + producer.process(exchange); + producer.stop(); + + // assert file is created + File file = new File(FTP_ROOT_DIR + "/deletefile/hello.txt"); + assertTrue("The file should exists", file.exists()); + } + +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact "commits@camel.apache.org" <commits@camel.apache.org>.