This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master-alignment in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 31d19da3690a399e622bec67077f344e2b83b8f2 Author: Freeman(Yue) Fang <freeman.f...@gmail.com> AuthorDate: Thu Mar 18 10:23:49 2021 -0400 add itests for camel-cxfrs-kafka-connector #996 (#1099) * add itests for camel-cxfrs-kafka-connector #996 * NetworkUtils should be able to hold a free port for later usage #1109 * address feedback * #1109 address PR review feedback (cherry picked from commit 94929f308ae7a155a6b3d0de23657bff11f41fde) --- tests/itests-cxfrs/pom.xml | 114 +++++++++++ .../camel/kafkaconnector/cxfrs/Customer.java | 79 +++++++ .../cxfrs/CustomerServiceResource.java | 49 +++++ .../cxfrs/source/CamelSourceCXFRSITCase.java | 228 +++++++++++++++++++++ .../source/CamelSourceCXFRSPropertyFactory.java | 63 ++++++ .../kafkaconnector/cxfrs/source/HelloService.java | 35 ++++ tests/pom.xml | 1 + 7 files changed, 569 insertions(+) diff --git a/tests/itests-cxfrs/pom.xml b/tests/itests-cxfrs/pom.xml new file mode 100644 index 0000000..f8f8fed --- /dev/null +++ b/tests/itests-cxfrs/pom.xml @@ -0,0 +1,114 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-cxfrs</artifactId> + <name>Camel-Kafka-Connector :: Tests :: CXF RS</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-common</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-dispatch-router</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cxf</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-transports-http-jetty</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-continuation</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-testutils</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-extension-providers</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + +</project> diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/Customer.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/Customer.java new file mode 100644 index 0000000..fee5122 --- /dev/null +++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/Customer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.cxfrs; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.util.ObjectHelper; + +/** + * + * @version + */ +@XmlRootElement(name = "Customer") +public class Customer { + private long id; + private String name; + + public Customer() { + } + + public Customer(long id, String name) { + setId(id); + setName(name); + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (id ^ (id >>> 32)); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Customer)) { + return false; + } + + if (this == obj) { + return true; + } + + Customer other = (Customer) obj; + return id == other.id && ObjectHelper.equal(name, other.name); + } + +} diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/CustomerServiceResource.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/CustomerServiceResource.java new file mode 100644 index 0000000..1de876e --- /dev/null +++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/CustomerServiceResource.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.cxfrs; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; + + +// START SNIPPET: example +@Path("/customerservice/") +public interface CustomerServiceResource { + + @GET + @Path("/customers/{id}/") + Customer getCustomer(@PathParam("id") String id); + + @PUT + @Path("/customers/") + Response updateCustomer(Customer customer); + + @Path("/{id}") + @PUT() + @Consumes({ "application/xml", "text/plain", + "application/json" }) + @Produces({ "application/xml", "text/plain", + "application/json" }) + Object invoke(@PathParam("id") String id, + String payload); +} +// END SNIPPET: example diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java new file mode 100644 index 0000000..789df85 --- /dev/null +++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxfrs.source; + +import java.util.concurrent.ExecutionException; + +import javax.servlet.ServletRequest; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.cxf.common.message.CxfConstants; +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.cxfrs.Customer; +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.ext.logging.LoggingInInterceptor; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.jaxrs.impl.ResponseImpl; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + + +/** + * A simple test case that checks whether the CXF RS Consumer Endpoint produces the expected number of + * messages + */ +public class CamelSourceCXFRSITCase extends CamelSourceTestSupport { + + protected static final String LOCALHOST = NetworkUtils.getHostname(); + protected static final int PORT = NetworkUtils.getFreePort(LOCALHOST); + protected static final String CXT = PORT + "/CxfRsConsumerTest"; + protected static final String CXF_RS_ENDPOINT_ADDRESS = "http://" + LOCALHOST + ":" + CXT + "/rest"; + protected static final String CXF_RS_ENDPOINT_URI = CXF_RS_ENDPOINT_ADDRESS + + "?resourceClasses=org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource"; + + + + private static String[] receivedValue = {"[126]", "[123]", "[400]"}; + + + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFRSITCase.class); + + private int received; + private final int expect = 3; + + + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-cxfrs-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + received = 0; + + } + + + + @Override + protected void produceTestData() { + TestUtils.waitFor(() -> NetworkUtils.portIsOpen(LOCALHOST, PORT)); + + try { + Bus bus = BusFactory.newInstance().createBus(); + + bus.getInInterceptors().add(new LoggingInInterceptor()); + bus.getOutInterceptors().add(new LoggingOutInterceptor()); + try { + doTestGetCustomer("rest"); + } catch (Exception e) { + LOG.info("Test Invocation Failure", e); + } + + + } catch (Exception e) { + LOG.info("Unable to invoke service: {}", e.getMessage(), e); + fail("Unable to invoke service"); + } + } + + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + LOG.info("Consumed messages: {}", consumer.consumedMessages()); + + for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) { + Object receivedObject = consumer.consumedMessages().get(received).value(); + if (!(receivedObject instanceof String)) { + fail("Unexpected message type"); + } + + String result = (String) receivedObject; + assertEquals(receivedValue[received++], result); + + + } + } + + + + @Test + @Timeout(20) + public void testBasicSendReceive() { + try { + String topicName = getTopicForTest(this); + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withAddress(CXF_RS_ENDPOINT_ADDRESS) + .withResourceClass("org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource"); + + + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } catch (Exception e) { + LOG.error("CXF test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + @Test + @Timeout(20) + public void testBasicSendReceiveWithoutProcessor() { + try { + + String topicName = getTopicForTest(this); + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withAddress(CXF_RS_ENDPOINT_ADDRESS) + .withResourceClass("org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource"); + + + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } catch (Exception e) { + LOG.error("CXF test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + @Test + @Timeout(20) + public void testBasicSendReceiveUsingUrl() { + try { + String topicName = getTopicForTest(this); + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withUrl(CXF_RS_ENDPOINT_URI).buildUrl(); + + + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } catch (Exception e) { + LOG.error("CXF test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + + + private void invokeGetCustomer(String uri, String expect) throws Exception { + HttpGet get = new HttpGet(uri); + get.addHeader("Accept", "application/json"); + CloseableHttpClient httpclient = HttpClientBuilder.create().build(); + + try { + HttpResponse response = httpclient.execute(get); + + } finally { + httpclient.close(); + } + } + + private void doTestGetCustomer(String contextUri) throws Exception { + invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/126", + "{\"Customer\":{\"id\":126,\"name\":\"CKC\"}}"); + invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/123", + "customer response back!"); + invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/400", + "The remoteAddress is 127.0.0.1"); + + } + +} diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSPropertyFactory.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSPropertyFactory.java new file mode 100644 index 0000000..463fa17 --- /dev/null +++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSPropertyFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxfrs.source; + + +import org.apache.camel.kafkaconnector.CamelConnectorConfig; +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + + +/** + * Creates the set of properties used by a Camel CXF Source Connector + */ +final class CamelSourceCXFRSPropertyFactory extends SourceConnectorPropertyFactory<CamelSourceCXFRSPropertyFactory> { + private CamelSourceCXFRSPropertyFactory() { + + } + + public CamelSourceCXFRSPropertyFactory withAddress(String address) { + return setProperty("camel.source.path.address", address); + } + + public CamelSourceCXFRSPropertyFactory withResourceClass(String resourceClasses) { + return setProperty("camel.source.endpoint.resourceClasses", resourceClasses); + } + + public CamelSourceCXFRSPropertyFactory withPublishedEndpointUrl(String publishedEndpointUrl) { + return setProperty("camel.source.endpoint.publishedEndpointUrl", publishedEndpointUrl); + } + + + + public static CamelSourceCXFRSPropertyFactory basic() { + return new CamelSourceCXFRSPropertyFactory() + .withName("CamelCXFRSSourceConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.cxfrs.CamelCxfrsSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } + + public EndpointUrlBuilder<CamelSourceCXFRSPropertyFactory> withUrl(String cxfUrl) { + String url = String.format("cxfrs://%s", cxfUrl); + return new EndpointUrlBuilder<>(this::withSourceUrl, url); + } + + +} diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/HelloService.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/HelloService.java new file mode 100644 index 0000000..72cbd7d --- /dev/null +++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/HelloService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.cxfrs.source; + +import java.util.List; + +public interface HelloService { + String sayHello(); + + void ping(); + + int getInvocationCount(); + + String echo(String text) throws Exception; + + Boolean echoBoolean(Boolean bool); + + String complexParameters(List<String> par1, List<String> par2); + +} diff --git a/tests/pom.xml b/tests/pom.xml index 4009424..612f57b 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -62,6 +62,7 @@ <module>itests-ssh</module> <module>itests-sql</module> <module>itests-cxf</module> + <module>itests-cxfrs</module> <module>itests-netty</module> <module>itests-google-pubsub</module> </modules>