This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 61a7234 Added a test to make sure we retain original exception data on errors 61a7234 is described below commit 61a7234374568385277a1729aea0b6242d38ec8a Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Jan 15 14:45:29 2021 +0100 Added a test to make sure we retain original exception data on errors --- .../kafkaconnect/KafkaConnectEmbedded.java | 10 ++ .../services/kafkaconnect/KafkaConnectRunner.java | 9 ++ .../kafkaconnect/KafkaConnectRunnerService.java | 6 + .../services/kafkaconnect/KafkaConnectService.java | 4 + .../sjms2/sink/CamelSinkJMSStartupITCase.java | 130 +++++++++++++++++++++ 5 files changed, 159 insertions(+) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java index c53a568..bc6b868 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java @@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector.common.services.kafkaconnect; import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService; @@ -99,4 +100,13 @@ public class KafkaConnectEmbedded implements KafkaConnectService { public void start() { // NO-OP } + + private ConnectorStateInfo getConnectorStatus(String connectorName) { + return cluster.connectorStatus(connectorName); + } + + + public void connectorStateCheck(Consumer<ConnectorStateInfo> taskStateConsumer) { + cluster.connectors().forEach(c -> taskStateConsumer.accept(getConnectorStatus(c))); + } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java index d626c44..d5affd0 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.WorkerInfo; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.FileOffsetBackingStore; @@ -227,4 +228,12 @@ class KafkaConnectRunner { LOG.warn("Trying to stop an uninitialized Kafka Connect Runner"); } } + + private ConnectorStateInfo getConnectorStatus(String connectorName) { + return herder.connectorStatus(connectorName); + } + + public void connectorStateCheck(Consumer<ConnectorStateInfo> taskStateConsumer) { + herder.connectors().forEach(c -> taskStateConsumer.accept(getConnectorStatus(c))); + } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java index 4c623a6..0c48bb9 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunnerService.java @@ -24,10 +24,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.test.infra.kafka.services.KafkaService; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,4 +119,8 @@ public class KafkaConnectRunnerService implements KafkaConnectService { LOG.error("The test was interrupted while executing"); } } + + public void connectorStateCheck(Consumer<ConnectorStateInfo> taskStateConsumer) { + kafkaConnectRunner.connectorStateCheck(taskStateConsumer); + } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java index 5ae9e07..878800f 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectService.java @@ -18,8 +18,10 @@ package org.apache.camel.kafkaconnector.common.services.kafkaconnect; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -32,6 +34,8 @@ public interface KafkaConnectService extends BeforeTestExecutionCallback, AfterT void stop(); void start(); + void connectorStateCheck(Consumer<ConnectorStateInfo> taskStateConsumer); + @Override default void afterTestExecution(ExtensionContext extensionContext) { stop(); diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java new file mode 100644 index 0000000..f2dac55 --- /dev/null +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sjms2.sink; + +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * A simple test to make sure we are not losing or hiding exception data on errors + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSinkJMSStartupITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSStartupITCase.class); + + private boolean running; + private String trace; + + + private Properties connectionProperties() { + Properties properties = new Properties(); + + properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory"); + properties.put("camel.component.sjms2.connection-factory.remoteURI", "tcp://invalid"); + + return properties; + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-sjms2-kafka-connector"}; + } + + private void connectorStateCheck(ConnectorStateInfo connectorStateInfo) { + LOG.debug("Checking state for {}", connectorStateInfo.name()); + running = connectorStateInfo.tasks().stream().allMatch(t -> isRunning(t)); + + } + + private boolean isRunning(ConnectorStateInfo.TaskState t) { + boolean isRunningState = t.state().equals("RUNNING"); + if (!isRunningState) { + trace = t.trace(); + } + + return isRunningState; + } + + private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message "); + } + + private void checkThatFailed() throws InterruptedException { + int i = 25; + do { + kafkaConnectService.connectorStateCheck(this::connectorStateCheck); + i--; + + if (i > 0 && running) { + Thread.sleep(Duration.ofSeconds(1).toMillis()); + } + } while (i > 0 && running); + + assertFalse(running, "The connector should be in a failed state"); + + LOG.trace(trace); + assertTrue(trace.contains("Failed to resolve endpoint"), + "Trace should contain a Camel error message"); + } + + + @Test + @Timeout(30) + public void testStartup() { + try { + Properties brokenProp = connectionProperties(); + + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withConnectionProperties(brokenProp) + .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE) + .withDeadLetterQueueTopicName("dlq-sink-topic"); + + // Inject an invalid configuration and check that fails + runTest(connectorPropertyFactory); + + checkThatFailed(); + } catch (Exception e) { + LOG.error("JMS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +}