This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 633931b5f5494bae20830de5daa5a83f38409c40 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Feb 19 14:05:29 2021 +0100 Convert the CXF sink test case to use the base Sink test class - Move common test code to a common package - Remove exception handling code - Move the request to separate files --- tests/itests-cxf/pom.xml | 5 + .../cxf/{source => common}/HelloService.java | 2 +- .../cxf/sink/CamelSinkCXFITCase.java | 122 ++++++++++----------- .../kafkaconnector/cxf/sink/HelloServiceImpl.java | 2 +- .../sink/SinkServerFactoryBeanConfigurator.java | 2 +- .../cxf/source/CamelSourceCXFITCase.java | 7 +- .../src/test/resources/hello-service-test.xml | 26 +++++ tests/itests-cxf/src/test/resources/jaxws-test.xml | 26 +++++ 8 files changed, 121 insertions(+), 71 deletions(-) diff --git a/tests/itests-cxf/pom.xml b/tests/itests-cxf/pom.xml index fc353ea..1882492 100644 --- a/tests/itests-cxf/pom.xml +++ b/tests/itests-cxf/pom.xml @@ -94,6 +94,11 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java similarity index 95% rename from tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java rename to tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java index 5c4653f..90429a6 100644 --- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/HelloService.java +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/common/HelloService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.camel.kafkaconnector.cxf.source; +package org.apache.camel.kafkaconnector.cxf.common; import java.util.List; diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java index 68bb9e7..6690341 100644 --- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java @@ -17,17 +17,18 @@ package org.apache.camel.kafkaconnector.cxf.sink; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.cxf.services.CXFEmbeddedServerService; import org.apache.camel.kafkaconnector.cxf.services.CXFService; import org.apache.camel.kafkaconnector.cxf.services.JaxWsServiceConfigurator; +import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -39,27 +40,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class CamelSinkCXFITCase extends AbstractKafkaTest { - protected static final String TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">" - + "<soap:Body><ns1:echo xmlns:ns1=\"http://source.cxf.kafkaconnector.camel.apache.org/\">" - + "<arg0 xmlns=\"http://source.cxf.kafkaconnector.camel.apache.org/\">hello world</arg0>" - + "</ns1:echo></soap:Body></soap:Envelope>"; - - protected static final String JAXWS_TEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\"\n" - + " + \"<soap:Body><ns1:greetMe xmlns:ns1=\"http://apache.org/hello_world_soap_http/types\">\"\n" - + " + \"<requestType xmlns=\"http://apache.org/hello_world_soap_http/types\">hello world!</requestType>\"\n" - + " + \"</ns1:greetMe></soap:Body></soap:Envelope>"; - +public class CamelSinkCXFITCase extends CamelSinkTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCXFITCase.class); - + private SinkServerFactoryBeanConfigurator serverFactoryBeanConfigurator = new SinkServerFactoryBeanConfigurator(); private JaxWsServiceConfigurator jaxWsServiceConfigurator = new SinkJaxWsServiceConfigurator(); @RegisterExtension public CXFService service = new CXFEmbeddedServerService(serverFactoryBeanConfigurator, jaxWsServiceConfigurator); - - private final int expect = 10; private String topicName; @@ -74,17 +63,35 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest { GreeterImpl.outputFile().delete(); } + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + // NO-OP (the messages are consumed on each service implementation) + Thread.sleep(5000); + } catch (Exception e) { + LOG.warn("Interrupted"); + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + // NO-OP (specific for each) + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("Failed to receive the messages within the specified time: received %d of %d"); + } + } + private void putRecords(String message, int count) { KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - for (int i = 0; i < count; i++) { - try { + try { + for (int i = 0; i < count; i++) { kafkaClient.produce(topicName, message); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; } + } catch (Exception e) { + fail(e.getMessage()); } } @@ -92,54 +99,39 @@ public class CamelSinkCXFITCase extends AbstractKafkaTest { return service.getJaxWsServerAddress() + "?serviceClass=org.apache.hello_world_soap_http.Greeter"; } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message, int count) - throws ExecutionException, InterruptedException, TimeoutException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); - ExecutorService service = Executors.newCachedThreadPool(); - Runnable r = () -> this.putRecords(message, count); - service.submit(r); - Thread.sleep(5000); - LOG.debug("Created the consumer ... About to receive messages"); - } - @Test - public void testBasicSendReceiveUsingUrl() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory - .basic() - .withName("CamelCXFSinkConnector") - .withTopics(topicName) - .withAddress(service.getSimpleServerAddress()) - .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService") - .withDataFormat("RAW"); + public void testBasicSendReceiveUsingUrl() throws Exception { + InputStream stream = this.getClass().getResource("/hello-service-test.xml").openStream(); + String testMessage = IOUtils.toString(stream, Charset.defaultCharset()); - runTest(connectorPropertyFactory, TEST_MESSAGE, expect); + ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory + .basic() + .withName("CamelCXFSinkConnector") + .withTopics(topicName) + .withAddress(service.getSimpleServerAddress()) + .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService") + .withDataFormat("RAW"); - assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount()); - } catch (Exception e) { - LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e); - fail(e.getMessage(), e); - } + runTest(connectorPropertyFactory, () -> putRecords(testMessage, expect)); + + assertEquals(expect, serverFactoryBeanConfigurator.getInvocationCount()); } @Test @Timeout(90) - public void testJaxWsBasicSendReceiveUsingUrl() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory - .basic() - .withName("CamelCXFSinkConnectorUrl") - .withTopics(topicName) - .withAddress(getJaxwsEndpointUri()) - .withDataFormat("RAW"); + public void testJaxWsBasicSendReceiveUsingUrl() throws Exception { + InputStream stream = this.getClass().getResource("/jaxws-test.xml").openStream(); + String testMessage = IOUtils.toString(stream, Charset.defaultCharset()); - runTest(connectorPropertyFactory, JAXWS_TEST_MESSAGE, 1); + ConnectorPropertyFactory connectorPropertyFactory = CamelSinkCXFPropertyFactory + .basic() + .withName("CamelCXFSinkConnectorUrl") + .withTopics(topicName) + .withAddress(getJaxwsEndpointUri()) + .withDataFormat("RAW"); - assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created"); - } catch (Exception e) { - LOG.error("CXF Sink test failed: {} {}", e.getMessage(), e); - fail(e.getMessage(), e); - } + runTest(connectorPropertyFactory, () -> putRecords(testMessage, 1)); + + assertTrue(GreeterImpl.outputFile().exists(), "The test output file was not created"); } } \ No newline at end of file diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java index 88f1f12..3aeed6d 100644 --- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/HelloServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.camel.kafkaconnector.cxf.sink; import java.util.List; -import org.apache.camel.kafkaconnector.cxf.source.HelloService; +import org.apache.camel.kafkaconnector.cxf.common.HelloService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java index 28860da..e410d09 100644 --- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/SinkServerFactoryBeanConfigurator.java @@ -16,8 +16,8 @@ */ package org.apache.camel.kafkaconnector.cxf.sink; +import org.apache.camel.kafkaconnector.cxf.common.HelloService; import org.apache.camel.kafkaconnector.cxf.services.ServerFactoryBeanConfigurator; -import org.apache.camel.kafkaconnector.cxf.source.HelloService; import org.apache.cxf.frontend.ServerFactoryBean; class SinkServerFactoryBeanConfigurator implements ServerFactoryBeanConfigurator { diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java index 9a75ea4..a4327ed 100644 --- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java @@ -25,6 +25,7 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil; +import org.apache.camel.kafkaconnector.cxf.common.HelloService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -43,7 +44,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest { protected static final int PORT = NetworkUtils.getFreePort("localhost"); protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + "/CxfConsumerTest/test"; protected static final String SIMPLE_ENDPOINT_URI = SIMPLE_ENDPOINT_ADDRESS - + "?serviceClass=org.apache.camel.kafkaconnector.cxf.source.HelloService" + + "?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService" + "&publishedEndpointUrl=http://www.simple.com/services/test"; private static final String TEST_MESSAGE = "Hello World!"; @@ -104,7 +105,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest { try { ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic() .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS) - .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService"); + .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService"); runBasicStringTest(connectorPropertyFactory); } catch (Exception e) { @@ -134,7 +135,7 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest { try { ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic() .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS) - .withServiceClass("org.apache.camel.kafkaconnector.cxf.source.HelloService").withDataFormat("POJO"); + .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO"); runBasicStringTest(connectorPropertyFactory); } catch (Exception e) { diff --git a/tests/itests-cxf/src/test/resources/hello-service-test.xml b/tests/itests-cxf/src/test/resources/hello-service-test.xml new file mode 100644 index 0000000..9405a47 --- /dev/null +++ b/tests/itests-cxf/src/test/resources/hello-service-test.xml @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> + <soap:Body> + <ns1:echo xmlns:ns1="http://source.cxf.kafkaconnector.camel.apache.org/"> + <arg0 xmlns="http://source.cxf.kafkaconnector.camel.apache.org/">hello world</arg0> + </ns1:echo> + </soap:Body> +</soap:Envelope> \ No newline at end of file diff --git a/tests/itests-cxf/src/test/resources/jaxws-test.xml b/tests/itests-cxf/src/test/resources/jaxws-test.xml new file mode 100644 index 0000000..02b330c --- /dev/null +++ b/tests/itests-cxf/src/test/resources/jaxws-test.xml @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> + <soap:Body> + <ns1:greetMe xmlns:ns1="http://apache.org/hello_world_soap_http/types"> + <requestType xmlns="http://apache.org/hello_world_soap_http/types">hello world!</requestType> + </ns1:greetMe> + </soap:Body> +</soap:Envelope> \ No newline at end of file