This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 314eee9 Added another test about removal of headers based on reg exp 314eee9 is described below commit 314eee947e6f9c979a9c1ddbcd5516c3f51b6539 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Dec 9 07:31:47 2020 +0100 Added another test about removal of headers based on reg exp --- .../camel/kafkaconnector/CamelSinkTaskTest.java | 48 ++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 74ebd32..047d858 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -195,6 +195,54 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testBodyAndHeadersExclusionsRegex() { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*"); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + Byte myByte = new Byte("100"); + Float myFloat = new Float("100"); + Short myShort = new Short("100"); + Double myDouble = new Double("100"); + int myInteger = 100; + Long myLong = new Long("100"); + BigDecimal myBigDecimal = new BigDecimal(1234567890); + Schema schema = Decimal.schema(myBigDecimal.scale()); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true); + record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte); + record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat); + record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort); + record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble); + record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger); + record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong); + record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema); + records.add(record); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertNull(exchange.getIn().getHeader("MyByte", Byte.class)); + assertNull(exchange.getIn().getHeader("MyFloat", Float.class)); + assertNull(exchange.getIn().getHeader("MyShort", Short.class)); + assertNull(exchange.getIn().getHeader("MyDouble", Double.class)); + assertNull(exchange.getIn().getHeader("MyInteger")); + assertNull(exchange.getIn().getHeader("MyLong", Long.class)); + assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); + + sinkTask.stop(); + } @Test public void testBodyAndProperties() {