This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d062ce45dcf639f2c39ca74539686d5ca530b80f
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Mon Feb 22 15:28:52 2021 +0100

    Convert the CXF source test case to use the base source test class
---
 .../common/test/CamelSourceTestSupport.java        |   2 -
 .../cxf/source/CamelSourceCXFITCase.java           | 125 ++++++++++-----------
 2 files changed, 57 insertions(+), 70 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 70eaa91..aeaecff 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -138,6 +138,4 @@ public abstract class CamelSourceTestSupport extends 
AbstractKafkaTest {
         verifyMessages(consumer);
         LOG.debug("Verified messages");
     }
-
-
 }
diff --git 
a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
 
b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index a4327ed..b9f04c2 100644
--- 
a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ 
b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -19,27 +19,26 @@ package org.apache.camel.kafkaconnector.cxf.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.cxf.client.CXFServiceUtil;
 import org.apache.camel.kafkaconnector.cxf.common.HelloService;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * A simple test case that checks whether the CXF Consumer Endpoint produces 
the expected number of messages
  */
-public class CamelSourceCXFITCase extends AbstractKafkaTest {
+public class CamelSourceCXFITCase extends CamelSourceTestSupport {
 
     protected static final int PORT = NetworkUtils.getFreePort("localhost");
     protected static final String SIMPLE_ENDPOINT_ADDRESS = 
"http://localhost:"; + PORT + "/CxfConsumerTest/test";
@@ -47,101 +46,91 @@ public class CamelSourceCXFITCase extends 
AbstractKafkaTest {
             + 
"?serviceClass=org.apache.camel.kafkaconnector.cxf.common.HelloService"
             + "&publishedEndpointUrl=http://www.simple.com/services/test";;
 
-    private static final String TEST_MESSAGE = "Hello World!";
-
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceCXFITCase.class);
 
-    private int received;
-    private final int expect = 1;
+    private final int expect = 10;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-cxf-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        received = 0;
-    }
-
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
 
-        received++;
+    @Override
+    protected void produceTestData() {
+        TestUtils.waitFor(() -> NetworkUtils.portIsOpen("localhost", PORT));
 
-        if (received == expect) {
-            return false;
-        }
+        try {
+            HelloService client = 
CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class);
 
-        return true;
-    }
+            for (int i = 0; i < expect; i++) {
+                client.echo("Test message " + i);
+            }
 
-    public void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory)
-            throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        // ensure cxf source connector is up
-        Thread.sleep(5000);
-        HelloService client = 
CXFServiceUtil.getService(SIMPLE_ENDPOINT_ADDRESS, HelloService.class);
-        try {
-            String result = client.echo(TEST_MESSAGE);
-            assertEquals(result, TEST_MESSAGE);
         } catch (Exception e) {
-            LOG.info("Test Invocation Failure", e);
+            LOG.info("Unable to invoke service: {}", e.getMessage(), e);
+            fail("Unable to invoke service");
         }
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        LOG.info("Consumed messages: {}", consumer.consumedMessages());
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
+        for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) {
+            Object receivedObject = consumer.consumedMessages().get(0).value();
+            if (!(receivedObject instanceof String)) {
+                fail("Unexpected message type");
+            }
 
-        assertEquals(received, expect, "Didn't process the expected amount of 
messages");
+            String result = (String) receivedObject;
+            assertTrue(result.contains("Test message"));
+        }
     }
 
+
     @Test
     @Timeout(20)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceCXFPropertyFactory.basic()
-                    
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
-                    
.withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
+    public void testBasicSendReceive() throws ExecutionException, 
InterruptedException {
+        String topicName = getTopicForTest(this);
 
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                
.withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(20)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceCXFPropertyFactory.basic()
-                    
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(SIMPLE_ENDPOINT_URI)
-                    .buildUrl();
+    public void testBasicSendReceiveUsingUrl() throws ExecutionException, 
InterruptedException {
+        String topicName = getTopicForTest(this);
 
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withUrl(SIMPLE_ENDPOINT_URI)
+                .buildUrl();
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(20)
-    public void testBasicSendReceiveUsingDataFormat() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceCXFPropertyFactory.basic()
-                    
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withAddress(SIMPLE_ENDPOINT_ADDRESS)
-                    
.withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService").withDataFormat("POJO");
+    public void testBasicSendReceiveUsingDataFormat() throws 
ExecutionException, InterruptedException {
+        String topicName = getTopicForTest(this);
 
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceCXFPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withAddress(SIMPLE_ENDPOINT_ADDRESS)
+                
.withServiceClass("org.apache.camel.kafkaconnector.cxf.common.HelloService")
+                .withDataFormat("POJO");
+
+        runTestBlocking(connectorPropertyFactory, topicName, expect);
     }
 
 }

Reply via email to