This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch sandbox/camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 64faf580bfbd31957b9a977fdb1b8a9ba530359e Author: swalendzik <seweryn.walend...@gmail.com> AuthorDate: Wed Dec 12 10:19:16 2018 +0100 CAMEL-12991 setting processStrategy for sftp/ftp endpoints (#2665) --- .../camel/component/file/remote/FtpEndpoint.java | 2 +- .../camel/component/file/remote/SftpEndpoint.java | 2 +- .../remote/FtpConsumerProcessStrategyTest.java | 90 +++++++++++++++++++++ .../sftp/SftpConsumerProcessStrategyTest.java | 93 ++++++++++++++++++++++ 4 files changed, 185 insertions(+), 2 deletions(-) diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java index e7ce84d..0c0b61f 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java @@ -96,7 +96,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile> @Override protected RemoteFileConsumer<FTPFile> buildConsumer(Processor processor) { try { - return new FtpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy()); + return new FtpConsumer(this, processor, createRemoteFileOperations(), processStrategy != null ? processStrategy : createGenericFileStrategy()); } catch (Exception e) { throw new FailedToCreateConsumerException(this, e); } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java index 069dfa7..41456f5 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java @@ -62,7 +62,7 @@ public class SftpEndpoint extends RemoteFileEndpoint<SftpRemoteFile> { @Override protected RemoteFileConsumer<SftpRemoteFile> buildConsumer(Processor processor) { - return new SftpConsumer(this, processor, createRemoteFileOperations(), createGenericFileStrategy()); + return new SftpConsumer(this, processor, createRemoteFileOperations(), processStrategy != null ? processStrategy : createGenericFileStrategy()); } protected GenericFileProducer<SftpRemoteFile> buildProducer() { diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java new file mode 100644 index 0000000..13b2c45 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerProcessStrategyTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.Exchange; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileEndpoint; +import org.apache.camel.component.file.GenericFileOperations; +import org.apache.camel.component.file.GenericFileProcessStrategy; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class FtpConsumerProcessStrategyTest extends FtpServerTestSupport { + + private MyStrategy myStrategy; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + myStrategy = new MyStrategy(); + jndi.bind("myStrategy", myStrategy); + return jndi; + } + + private String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?password=admin&processStrategy=#myStrategy"; + } + + @Test + public void testFtpConsume() throws Exception { + if (!canTest()) { + return; + } + + sendFile(getFtpUrl(), "Hello World", "hello.txt"); + + String out = consumer.receiveBody(getFtpUrl(), 5000, String.class); + assertNotNull(out); + assertTrue(out.startsWith("Hello World")); + assertEquals("Begin should have been invoked 1 times", 1, myStrategy.getInvoked()); + } + + private static class MyStrategy implements GenericFileProcessStrategy { + + private volatile int invoked; + + @Override + public void prepareOnStartup(GenericFileOperations operations, GenericFileEndpoint endpoint) throws Exception { + //noop + } + + @Override + public boolean begin(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + return true; + } + + @Override + public void abort(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + //noop + } + + @Override + public void commit(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + invoked++; + } + + @Override + public void rollback(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + //noop + } + + int getInvoked() { + return invoked; + } + } +} \ No newline at end of file diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java new file mode 100644 index 0000000..d314728 --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerProcessStrategyTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote.sftp; + +import org.apache.camel.Exchange; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileEndpoint; +import org.apache.camel.component.file.GenericFileOperations; +import org.apache.camel.component.file.GenericFileProcessStrategy; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class SftpConsumerProcessStrategyTest extends SftpServerTestSupport { + + private MyStrategy myStrategy; + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + myStrategy = new MyStrategy(); + jndi.bind("myStrategy", myStrategy); + return jndi; + } + + @Test + public void testSftpConsume() throws Exception { + if (!canTest()) { + return; + } + + // create file using regular file + template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, "Hello World", Exchange.FILE_NAME, "hello.txt"); + + String out = consumer.receiveBody("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&processStrategy=#myStrategy", 5000, String.class); + assertNotNull(out); + // Apache SSHD appends \u0000 at last byte in retrieved file + assertTrue(out.startsWith("Hello World")); + assertEquals("CustomProcessStrategy should have been invoked 1 times", 1, myStrategy.getInvoked()); + } + + private static class MyStrategy implements GenericFileProcessStrategy { + + private volatile int invoked; + + @Override + public void prepareOnStartup(GenericFileOperations operations, GenericFileEndpoint endpoint) throws Exception { + //noop + } + + @Override + public boolean begin(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + return true; + } + + @Override + public void abort(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + //noop + } + + @Override + public void commit(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + invoked++; + } + + @Override + public void rollback(GenericFileOperations operations, GenericFileEndpoint endpoint, Exchange exchange, GenericFile file) throws Exception { + //noop + } + + int getInvoked() { + return invoked; + } + } +}