This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 4b7ba490a56b7196774ba9e5e00c87e0f4a7bc74 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Feb 2 17:43:09 2021 +0100 Added a basic test for idempotency --- .../common/BasicConnectorPropertyFactory.java | 4 + .../common/IdempotencyConfigBuilder.java | 78 +++++++++ .../kafkaconnector/sjms2/clients/JMSClient.java | 43 +++-- .../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 192 +++++++++++++++++++++ 4 files changed, 307 insertions(+), 10 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java index a9d012e..0e98490 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java @@ -63,6 +63,10 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp return (T) this; } + public IdempotencyConfigBuilder<T> withIdempotency() { + return new IdempotencyConfigBuilder<>((T) this, connectorProps); + } + /** * This enables sending failed records to the DLQ. Note: it automatically configure other required/recommended * options! diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java new file mode 100644 index 0000000..2cde885 --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common; + +import java.util.Properties; + +public class IdempotencyConfigBuilder<T extends ConnectorPropertyFactory> { + private final T handle; + private final Properties properties; + + public IdempotencyConfigBuilder(T handle, Properties properties) { + this.handle = handle; + this.properties = properties; + + withEnabled(true); + } + + private IdempotencyConfigBuilder<T> withEntry(String key, Object value) { + properties.put("camel.idempotency." + key, value); + + return this; + } + + public IdempotencyConfigBuilder<T> withEnabled(boolean value) { + return withEntry("enabled", value); + } + + public IdempotencyConfigBuilder<T> withRepositoryType(String value) { + return withEntry("repository.type", value); + } + + public IdempotencyConfigBuilder<T> withExpressionType(String value) { + return withEntry("expression.type", value); + } + + public IdempotencyConfigBuilder<T> withExpressionHeader(String value) { + return withEntry("expression.header", value); + } + + public IdempotencyConfigBuilder<T> withMemoryDimension(String value) { + return withEntry("memory.dimension", value); + } + + public IdempotencyConfigBuilder<T> withKafkaTopic(String value) { + return withEntry("kafka.topic", value); + } + + public IdempotencyConfigBuilder<T> withKafkaBootstrapServers(String value) { + return withEntry("kafka.bootstrap.servers", value); + } + + public IdempotencyConfigBuilder<T> withKafkaMaxCacheSize(String value) { + return withEntry("kafka.max.cache.size", value); + } + + public IdempotencyConfigBuilder<T> withKafkaPollDurationMs(String value) { + return withEntry("kafka.poll.duration.ms", value); + } + + public T end() { + return handle; + } +} diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java index 43586c6..42e4103 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java @@ -161,6 +161,37 @@ public class JMSClient { } } + /** + * Receives data from a JMS queue or topic + * + * @param predicate the predicate used to test each received message + * @throws JMSException + */ + public void receive(MessageConsumer consumer, Predicate<Message> predicate, long timeout) throws JMSException { + while (true) { + final Message message = consumer.receive(timeout); + + if (!predicate.test(message)) { + return; + } + } + } + + + /** + * Receives data from a JMS queue or topic + * + * @param predicate the predicate used to test each received message + * @throws JMSException + */ + public void receive(MessageConsumer consumer, Predicate<Message> predicate) throws JMSException { + receive(consumer, predicate, 3000); + } + + public MessageConsumer createConsumer(String queue) throws JMSException { + return session.createConsumer(createDestination(queue)); + } + /** * Receives data from a JMS queue or topic @@ -170,20 +201,12 @@ public class JMSClient { * @throws JMSException */ public void receive(final String queue, Predicate<Message> predicate) throws JMSException { - final long timeout = 3000; - MessageConsumer consumer = null; try { - consumer = session.createConsumer(createDestination(queue)); - - while (true) { - final Message message = consumer.receive(timeout); + consumer = createConsumer(queue); - if (!predicate.test(message)) { - return; - } - } + receive(consumer, predicate); } finally { capturingClose(consumer); } diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java new file mode 100644 index 0000000..8eceee2 --- /dev/null +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sjms2.sink; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; +import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; +import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; +import org.apache.camel.test.infra.messaging.services.MessagingService; +import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Integration tests for the JMS sink using idempotent features + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { + @RegisterExtension + public static MessagingService jmsService = MessagingServiceBuilder + .newBuilder(DispatchRouterContainer::new) + .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) + .build(); + + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class); + + private String topic; + private int received; + private final int expect = 10; + + private Properties connectionProperties() { + Properties properties = new Properties(); + + properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory"); + properties.put("camel.component.sjms2.connection-factory.remoteURI", jmsService.defaultEndpoint()); + + return properties; + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-sjms2-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + LOG.info("JMS service running at {}", jmsService.defaultEndpoint()); + received = 0; + topic = TestUtils.getDefaultTestTopic(this.getClass()); + } + + private boolean checkRecord(Message jmsMessage) { + if (jmsMessage instanceof TextMessage) { + try { + LOG.debug("Received: {}", ((TextMessage) jmsMessage).getText()); + + received++; + + return true; + } catch (JMSException e) { + LOG.error("Failed to read message: {}", e.getMessage(), e); + fail("Failed to read message: " + e.getMessage()); + } + } + + return false; + } + + + private void consumeJMSMessages() { + JMSClient jmsClient = null; + + try { + jmsClient = JMSClient.newClient(jmsService.defaultEndpoint()); + jmsClient.start(); + + try (MessageConsumer consumer = jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) { + // number of retries until stale + int retries = 10; + + while (retries > 0) { + LOG.debug("Waiting for JMS messages (received {} of {} / retry {})", received, expect, retries); + jmsClient.receive(consumer, this::checkRecord, 1000); + + // Once staled for 'retries', then it means no more data to receive (hopefully) + if (expect == received) { + retries--; + } else { + retries = 10; + } + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted, stopping ..."); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.error("JMS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } finally { + if (jmsClient != null) { + jmsClient.stop(); + } + } + } + + private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + ExecutorService service = Executors.newCachedThreadPool(); + + LOG.debug("Creating the consumer ..."); + service.submit(() -> consumeJMSMessages()); + + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + LOG.debug("Sending message 1/2"); + kafkaClient.produce(topic, "Sink test message " + i); + LOG.debug("Sending message 2/2"); + kafkaClient.produce(topic, "Sink test message " + i); + } + + LOG.debug("Waiting for the messages to be processed"); + service.shutdown(); + + if (service.awaitTermination(25, TimeUnit.SECONDS)) { + assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + @Test + @Timeout(90) + public void testIdempotentBodySendReceive() { + try { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withTopics(topic) + .withConnectionProperties(connectionProperties()) + .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE) + .withIdempotency() + .withRepositoryType("memory") + .withExpressionType("body") + .end(); + + runTest(connectorPropertyFactory); + + } catch (Exception e) { + LOG.error("JMS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +}