This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 61a7234  Added a test to make sure we retain original exception data 
on errors
61a7234 is described below

commit 61a7234374568385277a1729aea0b6242d38ec8a
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Fri Jan 15 14:45:29 2021 +0100

    Added a test to make sure we retain original exception data on errors
---
 .../kafkaconnect/KafkaConnectEmbedded.java         |  10 ++
 .../services/kafkaconnect/KafkaConnectRunner.java  |   9 ++
 .../kafkaconnect/KafkaConnectRunnerService.java    |   6 +
 .../services/kafkaconnect/KafkaConnectService.java |   4 +
 .../sjms2/sink/CamelSinkJMSStartupITCase.java      | 130 +++++++++++++++++++++
 5 files changed, 159 insertions(+)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index c53a568..bc6b868 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -19,6 +19,7 @@ package 
org.apache.camel.kafkaconnector.common.services.kafkaconnect;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import 
org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
@@ -99,4 +100,13 @@ public class KafkaConnectEmbedded implements 
KafkaConnectService {
     public void start() {
         // NO-OP
     }
+
+    private ConnectorStateInfo getConnectorStatus(String connectorName) {
+        return cluster.connectorStatus(connectorName);
+    }
+
+
+    public void connectorStateCheck(Consumer<ConnectorStateInfo> 
taskStateConsumer) {
+        cluster.connectors().forEach(c -> 
taskStateConsumer.accept(getConnectorStatus(c)));
+    }
 }
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
index d626c44..d5affd0 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
@@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
@@ -227,4 +228,12 @@ class KafkaConnectRunner {
             LOG.warn("Trying to stop an uninitialized Kafka Connect Runner");
         }
     }
+
+    private ConnectorStateInfo getConnectorStatus(String connectorName) {
+        return herder.connectorStatus(connectorName);
+    }
+
+    public void connectorStateCheck(Consumer<ConnectorStateInfo> 
taskStateConsumer) {
+        herder.connectors().forEach(c -> 
taskStateConsumer.accept(getConnectorStatus(c)));
+    }
 }
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
index 4c623a6..0c48bb9 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java
@@ -24,10 +24,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,4 +119,8 @@ public class KafkaConnectRunnerService implements 
KafkaConnectService {
             LOG.error("The test was interrupted while executing");
         }
     }
+
+    public void connectorStateCheck(Consumer<ConnectorStateInfo> 
taskStateConsumer) {
+        kafkaConnectRunner.connectorStateCheck(taskStateConsumer);
+    }
 }
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java
index 5ae9e07..878800f 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java
@@ -18,8 +18,10 @@
 package org.apache.camel.kafkaconnector.common.services.kafkaconnect;
 
 import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -32,6 +34,8 @@ public interface KafkaConnectService extends 
BeforeTestExecutionCallback, AfterT
     void stop();
     void start();
 
+    void connectorStateCheck(Consumer<ConnectorStateInfo> taskStateConsumer);
+
     @Override
     default void afterTestExecution(ExtensionContext extensionContext) {
         stop();
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
new file mode 100644
index 0000000..f2dac55
--- /dev/null
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sjms2.sink;
+
+import java.time.Duration;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * A simple test to make sure we are not losing or hiding exception data on 
errors
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSinkJMSStartupITCase extends AbstractKafkaTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkJMSStartupITCase.class);
+
+    private boolean running;
+    private String trace;
+
+
+    private Properties connectionProperties() {
+        Properties properties = new Properties();
+
+        properties.put("camel.component.sjms2.connection-factory", 
"#class:org.apache.qpid.jms.JmsConnectionFactory");
+        properties.put("camel.component.sjms2.connection-factory.remoteURI", 
"tcp://invalid");
+
+        return properties;
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sjms2-kafka-connector"};
+    }
+
+    private void connectorStateCheck(ConnectorStateInfo connectorStateInfo) {
+        LOG.debug("Checking state for {}", connectorStateInfo.name());
+        running = connectorStateInfo.tasks().stream().allMatch(t -> 
isRunning(t));
+
+    }
+
+    private boolean isRunning(ConnectorStateInfo.TaskState t) {
+        boolean isRunningState =  t.state().equals("RUNNING");
+        if (!isRunningState) {
+            trace = t.trace();
+        }
+
+        return isRunningState;
+    }
+
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), 
"Sink test message ");
+    }
+
+    private void checkThatFailed() throws InterruptedException {
+        int i = 25;
+        do {
+            kafkaConnectService.connectorStateCheck(this::connectorStateCheck);
+            i--;
+
+            if (i > 0 && running) {
+                Thread.sleep(Duration.ofSeconds(1).toMillis());
+            }
+        } while (i > 0 && running);
+
+        assertFalse(running, "The connector should be in a failed state");
+
+        LOG.trace(trace);
+        assertTrue(trace.contains("Failed to resolve endpoint"),
+                "Trace should contain a Camel error message");
+    }
+
+
+    @Test
+    @Timeout(30)
+    public void testStartup() {
+        try {
+            Properties brokenProp = connectionProperties();
+
+            ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withConnectionProperties(brokenProp)
+                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .withDeadLetterQueueTopicName("dlq-sink-topic");
+
+            // Inject an invalid configuration and check that fails
+            runTest(connectorPropertyFactory);
+
+            checkThatFailed();
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}

Reply via email to