This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-kafka-connector-0.7.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-kafka-connector-0.7.x by this push: new 2e52089 Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835) 2e52089 is described below commit 2e520895550380e06a6e4938e0a359d18daf6609 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Jan 8 12:01:51 2021 +0100 Ensure the DLQ configuration from Kafka Connect is correctly handled (issue #835) --- .../apache/camel/kafkaconnector/CamelSinkTask.java | 14 ++- .../common/BasicConnectorPropertyFactory.java | 15 +++ .../sjms2/sink/CamelSinkWithDLQJMSITCase.java | 127 +++++++++++++++++++++ 3 files changed, 155 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 82f2f05..e44676b 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -51,6 +52,7 @@ public class CamelSinkTask extends SinkTask { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class); private static final String LOCAL_URL = "direct:start"; + private ErrantRecordReporter reporter; private CamelKafkaConnectMain cms; @@ -70,6 +72,10 @@ public class CamelSinkTask extends SinkTask { Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props); CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps); + if (context != null) { + reporter = context.errantRecordReporter(); + } + try { String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase()); @@ -175,7 +181,13 @@ public class CamelSinkTask extends SinkTask { producer.send(localEndpoint, exchange); if (exchange.isFailed()) { - throw new ConnectException("Exchange delivery has failed!", exchange.getException()); + if (reporter == null) { + LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored"); + throw new ConnectException("Exchange delivery has failed!", exchange.getException()); + } + + LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ"); + reporter.report(record, exchange.getException()); } } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java index d5dd4f6..cee96b9 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java @@ -63,6 +63,21 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp return (T) this; } + /** + * This enables sending failed records to the DLQ. Note: it automatically configure other required/recommended + * options! + * @param topicName the DLQ topic name + * @return this object instance + */ + public T withDeadLetterQueueTopicName(String topicName) { + // There's no constant for the DLQ settings + connectorProps.put("errors.deadletterqueue.topic.name", topicName); + connectorProps.put("errors.deadletterqueue.topic.replication.factor", 1); + connectorProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, true); + + return (T) this; + } + public TransformsConfigBuilder<T> withTransformsConfig(String name) { return new TransformsConfigBuilder<>((T) this, getProperties(), name); } diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java new file mode 100644 index 0000000..d47e4c8 --- /dev/null +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sjms2.sink; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Integration tests for the JMS sink with a DLQ configuration. This test forces a failure in the sink connector to + * ensure that the failed records are added to the DLQ configured in Kafka. + */ +@Testcontainers +public class CamelSinkWithDLQJMSITCase extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkWithDLQJMSITCase.class); + + private int received; + private final int expect = 10; + private int errors; + private final int expectedErrors = 1; + + private Properties connectionProperties() { + Properties properties = new Properties(); + + properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory"); + properties.put("camel.component.sjms2.connection-factory.remoteURI", "invalid"); + + return properties; + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-sjms2-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + received = 0; + errors = 0; + } + + private <T> boolean checkDqlRecord(ConsumerRecord<String, T> record) { + LOG.debug("Received: {}", record.value()); + errors++; + + if (errors >= expectedErrors) { + return false; + } + + return true; + } + + private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + LOG.debug("Creating the consumer ..."); + + + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); + } + + LOG.debug("Created the consumer ... About to receive messages"); + } + + + @Test + @Timeout(10) + public void testSendReceiveWithError() { + try { + Properties brokenProp = connectionProperties(); + + brokenProp.put("camel.component.sjms2.connection-factory.remoteURI", "invalid"); + + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withConnectionProperties(brokenProp) + .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE) + .withDeadLetterQueueTopicName("dlq-sink-topic"); + + runTest(connectorPropertyFactory); + + KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + kafkaClient.consume("dlq-sink-topic", this::checkDqlRecord); + + assertEquals(expectedErrors, errors, "Didn't process the expected amount of messages"); + + } catch (Exception e) { + LOG.error("JMS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +}