This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch topics.regex in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/topics.regex by this push: new 0618b2e Topics.regex: Added unit test 0618b2e is described below commit 0618b2ee6ac5c513628d9b71356f52d4a9f452d3 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Sep 14 11:09:36 2020 +0200 Topics.regex: Added unit test --- .../camel/kafkaconnector/CamelSinkTaskTest.java | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 84df83d..a050943 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -67,6 +67,35 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testTopicsRegex() { + Map<String, String> props = new HashMap<>(); + props.put("topics.regex", "topic1*"); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord("topic1", 1, null, "test", null, "camel", 42); + SinkRecord record1 = new SinkRecord("topic12", 1, null, "test", null, "cameltopicregex", 42); + records.add(record); + records.add(record1); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("cameltopicregex", exchange1.getMessage().getBody()); + assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + + sinkTask.stop(); + } @Test public void testBodyAndHeaders() {