This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch ssh-itest in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f98f3d9ff81ba5d6fa71d180e3736830cc429ed3 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Nov 26 08:09:32 2020 +0100 Added Integration tests for Camel-Ssh-Kafka-Connector --- tests/itests-ssh/pom.xml | 70 ++++++++++++++++ .../kafkaconnector/ssh/services/SshContainer.java | 45 ++++++++++ .../ssh/services/SshLocalContainerService.java | 54 ++++++++++++ .../ssh/services/SshRemoteService.java | 49 +++++++++++ .../kafkaconnector/ssh/services/SshService.java | 53 ++++++++++++ .../ssh/services/SshServiceFactory.java | 45 ++++++++++ .../ssh/sink/CamelSinkSshITCase.java | 97 ++++++++++++++++++++++ .../ssh/sink/CamelSshPropertyFactory.java | 49 +++++++++++ .../ssh/source/CamelSourceSshITCase.java | 83 ++++++++++++++++++ .../ssh/source/CamelSshPropertyFactory.java | 56 +++++++++++++ .../kafkaconnector/ssh/source/SshTransforms.java | 76 +++++++++++++++++ 11 files changed, 677 insertions(+) diff --git a/tests/itests-ssh/pom.xml b/tests/itests-ssh/pom.xml new file mode 100644 index 0000000..36302ed --- /dev/null +++ b/tests/itests-ssh/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-ssh</artifactId> + <name>Camel-Kafka-Connector :: Tests :: SSH</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-ssh</artifactId> + </dependency> + +<dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.8.0</version> +</dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + +</project> diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java new file mode 100644 index 0000000..12a452d --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshContainer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.ssh.services; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +/** + * A local instance of an Ubuntu Server with SSH enabled + */ +public class SshContainer extends GenericContainer<SshContainer> { + private static final String SSH_IMAGE = "rastasheep/ubuntu-sshd:14.04"; + private static final int SSH_PORT = 22; + + public SshContainer() { + super(SSH_IMAGE); + + withExposedPorts(SSH_PORT); + + waitingFor(Wait.forListeningPort()); + } + + public int getSSHPort() { + return getMappedPort(SSH_PORT); + } + + public String getSSHHost() { + return getContainerIpAddress(); + } + +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java new file mode 100644 index 0000000..9276890 --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshLocalContainerService.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SshLocalContainerService implements SshService { + private static final Logger LOG = LoggerFactory.getLogger(SshLocalContainerService.class); + + private SshContainer container; + + public SshLocalContainerService() { + container = new SshContainer(); + + container.start(); + } + + @Override + public int getSshPort() { + return container.getSSHPort(); + } + + @Override + public String getSshHost() { + return container.getSSHHost(); + } + + @Override + public void initialize() { + LOG.info("SSH server running at address {}", getSshEndpoint()); + } + + @Override + public void shutdown() { + LOG.info("Stopping the Ssh container"); + container.stop(); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java new file mode 100644 index 0000000..cb5de9c --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshRemoteService.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.services; + +public class SshRemoteService implements SshService { + + private static final int DEFAULT_SSH_PORT = 22; + + @Override + public void initialize() { + + } + + @Override + public void shutdown() { + + } + + @Override + public int getSshPort() { + String strPort = System.getProperty("ssh.port"); + + if (strPort != null) { + return Integer.parseInt(strPort); + } + + return DEFAULT_SSH_PORT; + } + + @Override + public String getSshHost() { + return System.getProperty("ssh.host"); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java new file mode 100644 index 0000000..d5a375e --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.services; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public interface SshService extends BeforeAllCallback, AfterAllCallback { + + int getSshPort(); + + default String getSshEndpoint() { + return getSshHost() + ":" + getSshPort(); + } + + String getSshHost(); + + /** + * Perform any initialization necessary + */ + void initialize(); + + /** + * Shuts down the service after the test has completed + */ + void shutdown(); + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } + + @Override + default void afterAll(ExtensionContext extensionContext) throws Exception { + shutdown(); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java new file mode 100644 index 0000000..8e933ef --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/services/SshServiceFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class SshServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(SshServiceFactory.class); + + private SshServiceFactory() { + + } + + public static SshService createService() { + String instanceType = System.getProperty("ssh.instance.type"); + + if (instanceType == null || instanceType.equals("local-ssh-container")) { + return new SshLocalContainerService(); + } + + if (instanceType.equals("remote")) { + return new SshRemoteService(); + } + + LOG.error("ssh instance must be one of 'local-ssh-container' or 'remote"); + throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType)); + + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java new file mode 100644 index 0000000..96fceb1 --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.sink; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.ssh.services.SshService; +import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers +public class CamelSinkSshITCase extends AbstractKafkaTest { + @RegisterExtension + public static SshService sshService = SshServiceFactory.createService(); + + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class); + + private final int expect = 3; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-ssh-kafka-connector"}; + } + + private void putRecords(CountDownLatch latch) { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + try { + for (int i = 0; i < expect; i++) { + try { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date"); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } + } + } finally { + latch.countDown(); + } + } + + public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + CountDownLatch latch = new CountDownLatch(1); + ExecutorService service = Executors.newCachedThreadPool(); + service.submit(() -> putRecords(latch)); + + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("Timed out wait for data to be added to the Kafka cluster"); + } + } + + @Timeout(90) + @Test + public void testSshCommand() throws ExecutionException, InterruptedException { + String topic = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost()) + .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root"); + + runTest(connectorPropertyFactory); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java new file mode 100644 index 0000000..9ca3dcb --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.sink; + +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +final class CamelSshPropertyFactory extends SinkConnectorPropertyFactory<CamelSshPropertyFactory> { + + private CamelSshPropertyFactory() { + + } + + public CamelSshPropertyFactory withHost(String host) { + return setProperty("camel.sink.path.host", host); + } + + public CamelSshPropertyFactory withPort(String port) { + return setProperty("camel.sink.path.port", port); + } + + public CamelSshPropertyFactory withUsername(String username) { + return setProperty("camel.sink.endpoint.username", username); + } + + public CamelSshPropertyFactory withPassword(String password) { + return setProperty("camel.sink.endpoint.password", password); + } + + public static CamelSshPropertyFactory basic() { + return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java new file mode 100644 index 0000000..8cd9abf --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.source; + +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.ssh.services.SshService; +import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CamelSourceSshITCase extends AbstractKafkaTest { + @RegisterExtension + public static SshService sshService = SshServiceFactory.createService(); + + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class); + + private final int expect = 1; + private int received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-ssh-kafka-connector"}; + } + + private <T> boolean checkRecord(ConsumerRecord<String, T> record) { + + LOG.debug("Received: {}", record.value()); + received++; + + return false; + } + + public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + LOG.debug("Creating the consumer ..."); + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); + LOG.debug("Created the consumer ..."); + + assertEquals(received, expect, "Didn't process the expected amount of messages"); + } + + @Timeout(90) + @Test + public void testRetrieveFromSsh() throws ExecutionException, InterruptedException { + String topic = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withKafkaTopic(topic).withHost(sshService.getSshHost()) + .withPort(Integer.toString(sshService.getSshPort())).withDelay(Integer.toString(10000)).withUsername("root").withPassword("root").withPollcommand("date") + .withTransformsConfig("SshTransforms").withEntry("type", "org.apache.camel.kafkaconnector.ssh.source.SshTransforms").end(); + + runTest(connectorPropertyFactory); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java new file mode 100644 index 0000000..bccd5ff --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSshPropertyFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.ssh.source; + +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + +final class CamelSshPropertyFactory extends SourceConnectorPropertyFactory<CamelSshPropertyFactory> { + + private CamelSshPropertyFactory() { + + } + + public CamelSshPropertyFactory withHost(String host) { + return setProperty("camel.source.path.host", host); + } + + public CamelSshPropertyFactory withPort(String port) { + return setProperty("camel.source.path.port", port); + } + + public CamelSshPropertyFactory withDelay(String value) { + return setProperty("camel.source.endpoint.delay", value); + } + + public CamelSshPropertyFactory withUsername(String username) { + return setProperty("camel.source.endpoint.username", username); + } + + public CamelSshPropertyFactory withPassword(String password) { + return setProperty("camel.source.endpoint.password", password); + } + + public CamelSshPropertyFactory withPollcommand(String pollCommand) { + return setProperty("camel.source.endpoint.pollCommand", pollCommand); + } + + public static CamelSshPropertyFactory basic() { + return new CamelSshPropertyFactory().withName("CamelSshSourceConnector").withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.ssh.CamelSshSourceConnector").withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java new file mode 100644 index 0000000..6097d75 --- /dev/null +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/SshTransforms.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.ssh.source; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Map; + +import org.apache.camel.kafkaconnector.utils.SchemaHelper; +import org.apache.commons.io.IOUtils; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SshTransforms<R extends ConnectRecord<R>> implements Transformation<R> { + public static final String FIELD_KEY_CONFIG = "key"; + public static final ConfigDef CONFIG_DEF = new ConfigDef().define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Transforms String-based content from Kafka into a map"); + + private static final Logger LOG = LoggerFactory.getLogger(SshTransforms.class); + + @Override + public R apply(R r) { + Object value = r.value(); + + if (r.value() instanceof ByteArrayInputStream) { + LOG.debug("Converting record from Ssh Body Result to text"); + ByteArrayInputStream message = (ByteArrayInputStream)r.value(); + String m = null; + try { + m = IOUtils.toString(message, Charset.defaultCharset()); + } catch (IOException e) { + e.printStackTrace(); + } + + return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), SchemaHelper.buildSchemaBuilderForType(m), m, r.timestamp()); + + } else { + LOG.debug("Unexpected message type: {}", r.value().getClass()); + + return r; + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map<String, ?> map) { + + } +}