This is an automated email from the ASF dual-hosted git repository. ffang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 224ae8f #873 initial cxf Source/Sink connectors test (#940) 224ae8f is described below commit 224ae8f9a709b780636e1a999e4792c201cb389a Author: Freeman(Yue) Fang <freeman.f...@gmail.com> AuthorDate: Tue Feb 9 10:34:40 2021 -0500 #873 initial cxf Source/Sink connectors test (#940) * #873 initial cxf Source/Sink connectors test * revise according to feedback --- tests/itests-cxf/pom.xml | 100 +++++++++++ .../cxf/sink/CamelSinkCXFITCase.java | 189 +++++++++++++++++++++ .../cxf/sink/CamelSinkCXFPropertyFactory.java | 58 +++++++ .../camel/kafkaconnector/cxf/sink/GreeterImpl.java | 30 ++++ .../kafkaconnector/cxf/sink/HelloServiceImpl.java | 81 +++++++++ .../cxf/source/CamelSourceCXFITCase.java | 181 ++++++++++++++++++++ .../cxf/source/CamelSourceCXFPropertyFactory.java | 64 +++++++ .../kafkaconnector/cxf/source/HelloService.java | 35 ++++ 8 files changed, 738 insertions(+) diff --git a/tests/itests-cxf/pom.xml b/tests/itests-cxf/pom.xml new file mode 100644 index 0000000..88ea1fa --- /dev/null +++ b/tests/itests-cxf/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.8.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-cxf</artifactId> + <name>Camel-Kafka-Connector :: Tests :: CXF</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-common</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-dispatch-router</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cxf</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-transports-http-jetty</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-continuation</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-testutils</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + +</project> diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java new file mode 100644 index 0000000..61c01c1 --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxf.sink; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import javax.xml.ws.Endpoint; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.cxf.source.HelloService; +import org.apache.cxf.BusFactory; +import org.apache.cxf.endpoint.Server; +import org.apache.cxf.ext.logging.LoggingInInterceptor; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.frontend.ServerFactoryBean; +import org.apache.cxf.jaxws.EndpointImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSinkCXFITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class); + + + private final int expect = 10; + + private final int simplePort = NetworkUtils.getFreePort("localhost"); + private final int jaxwsPort = NetworkUtils.getFreePort("localhost"); + + protected static final String ECHO_OPERATION = "echo"; + protected static final String GREET_ME_OPERATION = "greetMe"; + protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">" + + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">" + + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>" + + "</ns1:echo></soap:Body></soap:Envelope>"; + protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n" + + " + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n" + + " + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n" + + " + \"</ns1:greetMe></soap:Body></soap:Envelope>"; + + protected Server server; + protected EndpointImpl endpoint; + + + + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-cxf-kafka-connector"}; + } + + protected String getSimpleServerAddress() { + return "http://localhost:" + simplePort + "/" + getClass().getSimpleName() + "/simpletest"; + } + + protected String getJaxWsServerAddress() { + return "http://localhost:" + jaxwsPort + "/" + getClass().getSimpleName() + "/jaxwstest"; + } + + + @BeforeEach + public void setUp() throws IOException { + // start a simple front service + ServerFactoryBean svrBean = new ServerFactoryBean(); + svrBean.setAddress(getSimpleServerAddress()); + svrBean.setServiceClass(HelloService.class); + svrBean.setServiceBean(new HelloServiceImpl()); + svrBean.setBus(BusFactory.getDefaultBus()); + server = svrBean.create(); + server.getEndpoint().getInInterceptors().add(new LoggingInInterceptor()); + server.getEndpoint().getOutInterceptors().add(new LoggingOutInterceptor()); + // start a jaxws front service + GreeterImpl greeterImpl = new GreeterImpl(); + endpoint = (EndpointImpl)Endpoint.publish(getJaxWsServerAddress(), greeterImpl); + endpoint.getInInterceptors().add(new LoggingInInterceptor()); + endpoint.getOutInterceptors().add(new LoggingOutInterceptor()); + } + + @AfterEach + public void tearDown() { + endpoint.stop(); + server.stop(); + server.destroy(); + } + + + private void putRecords(String message) { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + try { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), message); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } + } + } + + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message) throws ExecutionException, InterruptedException, TimeoutException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + ExecutorService service = Executors.newCachedThreadPool(); + Runnable r = () -> this.putRecords(message); + service.submit(r); + Thread.sleep(5000); + LOG.debug("Created the consumer ... About to receive messages"); + + } + + @Test + @Timeout(90) + public void testBasicSendReceiveUsingUrl() { + try { + + + ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic() + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withAddress(getSimpleServerAddress()) + .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService") + .withDataFormat("RAW"); + + runTest(connectorPropertyFactory, TEST_MESSAGE); + } catch (Exception e) { + LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e); + fail(e.getMessage(), e); + } + } + + @Test + @Timeout(90) + public void testJaxWsBasicSendReceiveUsingUrl() { + try { + + + ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory.basic() + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withAddress(this.getJaxwsEndpointUri()) + .withDataFormat("RAW"); + + runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE); + } catch (Exception e) { + LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e); + fail(e.getMessage(), e); + } + } + + protected String getSimpleEndpointUri() { + return getSimpleServerAddress() + + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService"; + } + + protected String getJaxwsEndpointUri() { + return getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter"; + } + + +} \ No newline at end of file diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java new file mode 100644 index 0000000..e7ed6a7 --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFPropertyFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxf.sink; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + + +final class CamelSinkCXFPropertyFactory extends SinkConnectorPropertyFactory<CamelSinkCXFPropertyFactory> { + private CamelSinkCXFPropertyFactory() { + + } + + + + public EndpointUrlBuilder<CamelSinkCXFPropertyFactory> withUrl(String serviceUrl) { + String url = String.format("cxf://%s", serviceUrl); + + return new EndpointUrlBuilder<>(this::withSinkUrl, url); + } + + public CamelSinkCXFPropertyFactory withDataFormat(String dataFormat) { + return setProperty("camel.sink.endpoint.dataFormat", dataFormat); + } + + public CamelSinkCXFPropertyFactory withAddress(String address) { + return setProperty("camel.sink.path.address", address); + } + + public CamelSinkCXFPropertyFactory withServiceClass(String serviceClass) { + return setProperty("camel.sink.endpoint.serviceClass", serviceClass); + } + + public static CamelSinkCXFPropertyFactory basic() { + return new CamelSinkCXFPropertyFactory() + .withTasksMax(1) + .withName("CamelCXFSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} + diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java new file mode 100644 index 0000000..a5b909d --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/GreeterImpl.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxf.sink; + +import java.util.logging.Logger; + + +public class GreeterImpl extends org.apache.hello_world_soap_http.GreeterImpl { + + private static final Logger LOG = Logger.getLogger(GreeterImpl.class.getName()); + public String greetMe(String hi) { + LOG.info("jaxws greetMe " + hi); + return "Greet " + hi; + } +} diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java new file mode 100644 index 0000000..42f12f5 --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.cxf.sink; + +import java.util.List; + +import org.apache.camel.kafkaconnector.cxf.source.HelloService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HelloServiceImpl implements HelloService { + private static final Logger LOG = LoggerFactory.getLogger(HelloServiceImpl.class); + public static int invocationCount = 0; + + private String name; + + public HelloServiceImpl(String name) { + this.name = name; + } + + public HelloServiceImpl() { + name = ""; + } + + @Override + public String echo(String text) { + LOG.info("call for echo with " + text); + invocationCount++; + LOG.info("invocationCount is " + invocationCount); + return "echo " + text; + } + + @Override + public void ping() { + invocationCount++; + LOG.info("call for oneway ping"); + } + + @Override + public int getInvocationCount() { + return invocationCount; + } + + @Override + public String sayHello() { + + return "hello" + name; + } + + @Override + public Boolean echoBoolean(Boolean bool) { + LOG.info("call for echoBoolean with " + bool); + invocationCount++; + LOG.info("invocationCount is " + invocationCount); + return bool; + } + + @Override + public String complexParameters(List<String> par1, List<String> par2) { + String result = "param"; + if (par1 != null && par2 != null) { + result = result + ":" + par1.get(0) + par2.get(0); + } + return result; + } + +} \ No newline at end of file diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java new file mode 100644 index 0000000..4ddf9e8 --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxf.source; + +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.ext.logging.LoggingInInterceptor; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.frontend.ClientFactoryBean; +import org.apache.cxf.frontend.ClientProxyFactoryBean; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +/** + * A simple test case that checks whether the CXF Consumer Endpoint produces the expected number of + * messages + */ +public class CamelSourceCXFITCase extends AbstractKafkaTest { + + protected static final int PORT = NetworkUtils.getFreePort("localhost"); + protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + + "/CxfConsumerTest/test"; + protected static final String SIMPLE_ENDPOINT_URI = SIMPLE_ENDPOINT_ADDRESS + + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService" + + "&publishedEndpointUrl=http://www.simple.com/services/test"; + + + private static final String TEST_MESSAGE = "Hello World!"; + + + + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFITCase.class); + + private int received; + private final int expect = 1; + + + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-cxf-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + received = 0; + + } + + private <T> boolean checkRecord(ConsumerRecord<String, T> record) { + LOG.debug("Received: {}", record.value()); + + received++; + + if (received == expect) { + return false; + } + + return true; + } + + + + public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + Thread.sleep(5000);//ensure cxf source connector is up + ClientProxyFactoryBean proxyFactory = new ClientProxyFactoryBean(); + ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean(); + clientBean.setAddress(SIMPLE_ENDPOINT_ADDRESS); + clientBean.setServiceClass(HelloService.class); + Bus bus = BusFactory.newInstance().createBus(); + clientBean.setBus(bus); + bus.getInInterceptors().add(new LoggingInInterceptor()); + bus.getOutInterceptors().add(new LoggingOutInterceptor()); + HelloService client = (HelloService) proxyFactory.create(); + try { + String result = client.echo(TEST_MESSAGE); + assertEquals(result, TEST_MESSAGE); + } catch (Exception e) { + LOG.info("Test Invocation Failure", e); + } + + + LOG.debug("Creating the consumer ..."); + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); + LOG.debug("Created the consumer ..."); + + assertEquals(received, expect, "Didn't process the expected amount of messages"); + } + + + + @Test + @Timeout(20) + public void testBasicSendReceive() { + try { + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory + .basic() + .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withAddress(SIMPLE_ENDPOINT_ADDRESS) + .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService"); + + + runBasicStringTest(connectorPropertyFactory); + } catch (Exception e) { + LOG.error("CXF test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + @Test + @Timeout(20) + public void testBasicSendReceiveUsingUrl() { + try { + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory + .basic() + .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withUrl(SIMPLE_ENDPOINT_URI).buildUrl(); + + + runBasicStringTest(connectorPropertyFactory); + } catch (Exception e) { + LOG.error("CXF test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + + @Test + @Timeout(20) + public void testBasicSendReceiveUsingDataFormat() { + try { + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory + .basic() + .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withAddress(SIMPLE_ENDPOINT_ADDRESS) + .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService") + .withDataFormat("POJO"); + + + runBasicStringTest(connectorPropertyFactory); + } catch (Exception e) { + LOG.error("CXF test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + +} diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java new file mode 100644 index 0000000..7d054e5 --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFPropertyFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxf.source; + + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + + +/** + * Creates the set of properties used by a Camel CXF Source Connector + */ +final class CamelSourceCXFPropertyFactory extends SourceConnectorPropertyFactory<CamelSourceCXFPropertyFactory> { + private CamelSourceCXFPropertyFactory() { + + } + + public CamelSourceCXFPropertyFactory withAddress(String address) { + return setProperty("camel.source.path.address", address); + } + + public CamelSourceCXFPropertyFactory withServiceClass(String serviceClass) { + return setProperty("camel.source.endpoint.serviceClass", serviceClass); + } + + public CamelSourceCXFPropertyFactory withPublishedEndpointUrl(String publishedEndpointUrl) { + return setProperty("camel.source.endpoint.publishedEndpointUrl", publishedEndpointUrl); + } + + public CamelSourceCXFPropertyFactory withDataFormat(String dataFormat) { + return setProperty("camel.source.endpoint.dataFormat", dataFormat); + } + + public static CamelSourceCXFPropertyFactory basic() { + return new CamelSourceCXFPropertyFactory() + .withName("CamelCXFSourceConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.cxf.CamelCxfSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } + + public EndpointUrlBuilder<CamelSourceCXFPropertyFactory> withUrl(String cxfUrl) { + String url = String.format("cxf://%s", cxfUrl); + return new EndpointUrlBuilder<>(this::withSourceUrl, url); + } + + +} diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java new file mode 100644 index 0000000..5c4653f --- /dev/null +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxf.source; + +import java.util.List; + +public interface HelloService { + String sayHello(); + + void ping(); + + int getInvocationCount(); + + String echo(String text) throws Exception; + + Boolean echoBoolean(Boolean bool); + + String complexParameters(List<String> par1, List<String> par2); + +}