This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit a108cf158b8e7a8eec3b7e7d566646abdb770e74 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Jan 28 08:55:02 2021 +0100 Add a map Camel Properties to Kafka headers option to make the behavior configurable --- .../camel/kafkaconnector/CamelConnectorConfig.java | 4 ++ .../kafkaconnector/CamelSinkConnectorConfig.java | 3 +- .../apache/camel/kafkaconnector/CamelSinkTask.java | 6 ++- .../kafkaconnector/CamelSourceConnectorConfig.java | 3 +- .../camel/kafkaconnector/CamelSourceTask.java | 9 +++- .../camel/kafkaconnector/CamelSinkTaskTest.java | 59 ++++++++++++++++++++++ .../camel/kafkaconnector/CamelSourceTaskTest.java | 24 +++++++++ 7 files changed, 103 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java index 00c3ce4..196b872 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -43,6 +43,10 @@ public abstract class CamelConnectorConfig extends AbstractConfig { public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF = "camel.remove.headers.pattern"; public static final String CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC = "The pattern of the headers we want to exclude from the exchange"; + public static final Boolean CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT = true; + public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_CONF = "camel.map.properties"; + public static final String CAMEL_CONNECTOR_MAP_PROPERTIES_DOC = "If set to true, the connector will transform the exchange properties into kafka headers."; + public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0; public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries"; public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler"; diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index 6350fe9..f40749d 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -67,7 +67,8 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC) - .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC); + .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC) + .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index e44676b..a1bbb1e 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -59,6 +59,7 @@ public class CamelSinkTask extends SinkTask { private ProducerTemplate producer; private Endpoint localEndpoint; private LoggingLevel loggingLevel = LoggingLevel.OFF; + private boolean mapProperties; @Override public String version() { @@ -101,6 +102,7 @@ public class CamelSinkTask extends SinkTask { final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); + mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -173,7 +175,9 @@ public class CamelSinkTask extends SinkTask { if (header.key().startsWith(HEADER_CAMEL_PREFIX)) { mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders()); } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) { - mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties()); + if (mapProperties) { + mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties()); + } } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index a703b18..6384961 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -103,7 +103,8 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig { .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_CONF, Type.STRING, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_BOOTSTRAP_SERVERS_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_DOC) .define(CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF, Type.INT, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_DOC) - .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC); + .define(CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, Type.STRING, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_DOC) + .define(CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, Type.BOOLEAN, CAMEL_CONNECTOR_MAP_PROPERTIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_MAP_PROPERTIES_DOC);; public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index bd3aee6..f45b78e 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -58,6 +58,7 @@ public class CamelSourceTask extends SourceTask { private Long maxPollDuration; private String camelMessageHeaderKey; private LoggingLevel loggingLevel = LoggingLevel.OFF; + private boolean mapProperties; @Override public String version() { @@ -101,6 +102,7 @@ public class CamelSourceTask extends SourceTask { final int idempotentRepositoryKafkaMaxCacheSize = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_MAX_CACHE_SIZE_CONF); final int idempotentRepositoryKafkaPollDuration = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_KAFKA_POLL_DURATION_CONF); final String headersRemovePattern = config.getString(CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF); + mapProperties = config.getBoolean(CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); @@ -188,8 +190,11 @@ public class CamelSourceTask extends SourceTask { if (exchange.getMessage().hasHeaders()) { setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); } - if (exchange.hasProperties()) { - setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); + + if (mapProperties) { + if (exchange.hasProperties()) { + setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); + } } TaskHelper.logRecordContent(LOG, loggingLevel, record); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 047d858..93661a7 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -670,6 +671,64 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false"); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + Byte myByte = new Byte("100"); + Float myFloat = new Float("100"); + Short myShort = new Short("100"); + Double myDouble = new Double("100"); + int myInteger = 100; + Long myLong = new Long("100"); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record.headers().addBoolean(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyBoolean", true); + record.headers().addByte(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyByte", myByte); + record.headers().addFloat(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyFloat", myFloat); + record.headers().addShort(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyShort", myShort); + record.headers().addDouble(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyDouble", myDouble); + record.headers().addInt(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyInteger", myInteger); + record.headers().addLong(CamelSinkTask.PROPERTY_CAMEL_PREFIX + "MyLong", myLong); + record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true); + record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte); + record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat); + record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort); + record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble); + record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger); + record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong); + records.add(record); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertFalse(exchange.getProperties().containsKey("MyBoolean")); + assertFalse(exchange.getProperties().containsKey("MyByte")); + assertFalse(exchange.getProperties().containsKey("MyFloat")); + assertFalse(exchange.getProperties().containsKey("MyShort")); + assertFalse(exchange.getProperties().containsKey("MyDouble")); + assertFalse(exchange.getProperties().containsKey("MyInteger")); + assertFalse(exchange.getProperties().containsKey("MyLong")); + assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class)); + assertEquals(myByte, exchange.getIn().getHeader("MyByte", Byte.class)); + assertEquals(myFloat, exchange.getIn().getHeader("MyFloat", Float.class)); + assertEquals(myShort, exchange.getIn().getHeader("MyShort", Short.class)); + assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); + assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); + assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); + + sinkTask.stop(); + } @Test public void testIfExchangeFailsShouldThrowConnectException() { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index fadbbb4..f90876e 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -331,6 +331,30 @@ public class CamelSourceTaskTest { sourceTask.stop(); } } + + @Test + public void testSourceByteArrayProperty() { + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false", + CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start" + )); + + sourceTask.getCms().getProducerTemplate().sendBodyAndProperty(DIRECT_URI, "test", "byteArray", new Byte[] { + 1, 2 + }); + + try { + List<SourceRecord> results = sourceTask.poll(); + assertThat(results).hasSize(1); + + assertEquals(0, results.get(0).headers().size()); + } finally { + sourceTask.stop(); + } + } @Test public void testSourceDateHeader() {