This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit d062ce45dcf639f2c39ca74539686d5ca530b80f Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 22 15:28:52 2021 +0100 Convert the CXF source test case to use the base source test class --- .../common/test/CamelSourceTestSupport.java | 2 - .../cxf/source/CamelSourceCXFITCase.java | 125 ++++++++++----------- 2 files changed, 57 insertions(+), 70 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java index 70eaa91..aeaecff 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java @@ -138,6 +138,4 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { verifyMessages(consumer); LOG.debug("Verified messages"); } - - } diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java index a4327ed..b9f04c2 100644 --- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java +++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java @@ -19,27 +19,26 @@ package org.apache.camel.kafkaconnector.cxf.source; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil; import org.apache.camel.kafkaconnector.cxf.common.HelloService; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** * A simple test case that checks whether the CXF Consumer Endpoint produces the expected number of messages */ -public class CamelSourceCXFITCase extends AbstractKafkaTest { +public class CamelSourceCXFITCase extends CamelSourceTestSupport { protected static final int PORT = NetworkUtils.getFreePort("localhost"); protected static final String SIMPLE_ENDPOINT_ADDRESS = "http://localhost:" + PORT + "/CxfConsumerTest/test"; @@ -47,101 +46,91 @@ public class CamelSourceCXFITCase extends AbstractKafkaTest { + "?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService" + "&publishedEndpointUrl=http://www.simple.com/services/test"; - private static final String TEST_MESSAGE = "Hello World!"; - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFITCase.class); - private int received; - private final int expect = 1; + private final int expect = 10; @Override protected String[] getConnectorsInTest() { return new String[] {"camel-cxf-kafka-connector"}; } - @BeforeEach - public void setUp() { - received = 0; - } - - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - LOG.debug("Received: {}", record.value()); - received++; + @Override + protected void produceTestData() { + TestUtils.waitFor(() -> NetworkUtils.portIsOpen("localhost", PORT)); - if (received == expect) { - return false; - } + try { + HelloService client = CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class); - return true; - } + for (int i = 0; i < expect; i++) { + client.echo("Test message " + i); + } - public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) - throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - // ensure cxf source connector is up - Thread.sleep(5000); - HelloService client = CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class); - try { - String result = client.echo(TEST_MESSAGE); - assertEquals(result, TEST_MESSAGE); } catch (Exception e) { - LOG.info("Test Invocation Failure", e); + LOG.info("Unable to invoke service: {}", e.getMessage(), e); + fail("Unable to invoke service"); } + } + + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + LOG.info("Consumed messages: {}", consumer.consumedMessages()); - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) { + Object receivedObject = consumer.consumedMessages().get(0).value(); + if (!(receivedObject instanceof String)) { + fail("Unexpected message type"); + } - assertEquals(received, expect, "Didn't process the expected amount of messages"); + String result = (String) receivedObject; + assertTrue(result.contains("Test message")); + } } + @Test @Timeout(20) - public void testBasicSendReceive() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS) - .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService"); + public void testBasicSendReceive() throws ExecutionException, InterruptedException { + String topicName = getTopicForTest(this); - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("CXF test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withAddress(SIMPLE_ENDPOINT_ADDRESS) + .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService"); + + runTestBlocking(connectorPropertyFactory, topicName, expect); } @Test @Timeout(20) - public void testBasicSendReceiveUsingUrl() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(SIMPLE_ENDPOINT_URI) - .buildUrl(); + public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException { + String topicName = getTopicForTest(this); - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("CXF test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withUrl(SIMPLE_ENDPOINT_URI) + .buildUrl(); + + runTestBlocking(connectorPropertyFactory, topicName, expect); } @Test @Timeout(20) - public void testBasicSendReceiveUsingDataFormat() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS) - .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO"); + public void testBasicSendReceiveUsingDataFormat() throws ExecutionException, InterruptedException { + String topicName = getTopicForTest(this); - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("CXF test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withAddress(SIMPLE_ENDPOINT_ADDRESS) + .withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService") + .withDataFormat("POJO"); + + runTestBlocking(connectorPropertyFactory, topicName, expect); } }