This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 99fb9be  CAMEL-12503 : support for propagating camel headers to kafka 
and vice versa
99fb9be is described below

commit 99fb9be724c59e3c1b2bd77838a114a2d505e2cd
Author: Taras Danylchuk <tdanylc...@playtika.com>
AuthorDate: Thu May 10 17:33:03 2018 +0300

    CAMEL-12503 : support for propagating camel headers to kafka and vice versa
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  25 ++++-
 .../camel/component/kafka/KafkaConfiguration.java  |  36 +++++--
 .../camel/component/kafka/KafkaConsumer.java       |  25 ++++-
 .../component/kafka/KafkaHeaderFilterStrategy.java |  35 +++++++
 .../camel/component/kafka/KafkaProducer.java       |  79 +++++++++++---
 .../component/kafka/KafkaConsumerFullTest.java     |  12 +++
 .../component/kafka/KafkaProducerFullTest.java     | 113 +++++++++++++++++++++
 .../springboot/KafkaComponentConfiguration.java    |  15 +++
 8 files changed, 312 insertions(+), 28 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 7d89604..13244b7 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (90 parameters):
+==== Query Parameters (91 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -80,6 +80,7 @@ with the following path and query parameters:
 | Name | Description | Default | Type
 | *brokers* (common) | URL of the Kafka brokers to use. The format is 
host1:port1,host2:port2, and the list can be a subset of brokers or a VIP 
pointing to a subset of brokers. This option is known as bootstrap.servers in 
the Kafka documentation. |  | String
 | *clientId* (common) | The client id is a user-specified string sent in each 
request to help trace calls. It should logically identify the application 
making the request. |  | String
+| *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to 
filter header to and from Camel message. |  | HeaderFilterStrategy
 | *reconnectBackoffMaxMs* (common) | The maximum amount of time in 
milliseconds to wait when reconnecting to a broker that has repeatedly failed 
to connect. If provided, the backoff per host will increase exponentially for 
each consecutive connection failure, up to this maximum. After calculating the 
backoff increase, 20% random jitter is added to avoid connection storms. | 1000 
| Integer
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then an instance of 
KafkaManualCommit is stored on the Exchange message header, which allows end 
users to access this API and perform manual offset commits via the Kafka 
consumer. | false | boolean
 | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper 
the offset of messages already fetched by the consumer. This committed offset 
will be used when the process fails as the position from which the new consumer 
will begin. | true | Boolean
@@ -427,3 +428,25 @@ This will force a synchronous commit which will block 
until the commit is acknow
 
 If you want to use a custom implementation of `KafkaManualCommit` then you can 
configure a custom `KafkaManualCommitFactory`
 on the `KafkaComponent` that creates instances of your custom implementation.
+
+=== Kafka Headers propagation
+*Available as of Camel 2.22*
+
+When consuming messages from Kafka, headers will be propagated to camel 
exchange headers automatically.
+Producing flow backed by same behaviour - camel headers of particular exchange 
will be propagated to kafka message headers.
+
+Since kafka headers allows only `byte[]` values, in order camel exchnage 
header to be propagated its value should be serialized to `bytes[]`,
+otherwise header will be skipped.
+Following header value types are supported: `String`, `Integer`, `Long`, 
`Double`, `byte[]`.
+Note: all headers propagated *from* kafka *to* camel exchange will contain 
`byte[]` value.
+
+By default all headers are being filtered by `KafkaHeaderFilterStrategy`.
+Strategy filters out headers which start with `Camel` or `org.apache.camel` 
prefixes.
+Default strategy can be overridden by using `headerFilterStrategy` uri 
parameter in both `to` and `from` routes:
+```
+from("kafka:my_topic?headerFilterStrategy=#myStrategy")
+...
+.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
+```
+
+`myStrategy` object should be subclass of `HeaderFilterStrategy` and must be 
placed in the Camel registry, either manually or by registration as a bean in 
Spring/Blueprint, as it is `CamelContext` aware.
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 9b5ba6b..dabd475 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -24,6 +24,8 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spi.UriParam;
@@ -43,15 +45,18 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
 
 @UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware {
 
     //Common configuration properties
-    @UriPath(label = "common") @Metadata(required = "true")
+    @UriPath(label = "common")
+    @Metadata(required = "true")
     private String topic;
     @UriParam(label = "common")
     private String brokers;
     @UriParam(label = "common")
     private String clientId;
+    @UriParam(label = "common", description = "To use a custom 
HeaderFilterStrategy to filter header to and from Camel message.")
+    private HeaderFilterStrategy headerFilterStrategy = new 
KafkaHeaderFilterStrategy();
 
     @UriParam(label = "consumer")
     private boolean topicIsPattern;
@@ -294,10 +299,10 @@ public class KafkaConfiguration implements Cloneable {
     private Double kerberosRenewWindowFactor = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
     @UriParam(label = "common,security", defaultValue = "DEFAULT")
     //sasl.kerberos.principal.to.local.rules
-    private String kerberosPrincipalToLocalRules; 
+    private String kerberosPrincipalToLocalRules;
     @UriParam(label = "common,security", secret = true)
     //sasl.jaas.config
-    private String saslJaasConfig;   
+    private String saslJaasConfig;
 
     public KafkaConfiguration() {
     }
@@ -343,7 +348,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
         addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
isEnableIdempotence());
         addPropertyIfNotNull(props, 
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
-        
+
         // SSL
         applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
@@ -403,7 +408,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
         addPropertyIfNotNull(props, 
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
-        
+
         // SSL
         applySslConfiguration(props, getSslContextParameters());
         addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
getSslKeyPassword());
@@ -1029,14 +1034,14 @@ public class KafkaConfiguration implements Cloneable {
     public void setSaslMechanism(String saslMechanism) {
         this.saslMechanism = saslMechanism;
     }
-    
+
     public String getSaslJaasConfig() {
         return saslJaasConfig;
     }
 
     /**
      * Expose the kafka sasl.jaas.config parameter
-     * 
+     *
      * Example:
      * org.apache.kafka.common.security.plain.PlainLoginModule required 
username="USERNAME" password="PASSWORD";
      */
@@ -1498,7 +1503,7 @@ public class KafkaConfiguration implements Cloneable {
      * Set if KafkaConsumer will read from beginning or end on startup:
      * beginning : read from beginning
      * end : read from end
-     * 
+     *
      * This is replacing the earlier property seekToBeginning
      */
     public void setSeekTo(String seekTo) {
@@ -1559,6 +1564,7 @@ public class KafkaConfiguration implements Cloneable {
     public String getInterceptorClasses() {
         return interceptorClasses;
     }
+
     /**
      * Sets interceptors for producer or consumers.
      * Producer interceptors have to be classes implementing {@link 
org.apache.kafka.clients.producer.ProducerInterceptor}
@@ -1596,4 +1602,16 @@ public class KafkaConfiguration implements Cloneable {
     public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
         this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
     }
+
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy;
+    }
+
+    /**
+     * To use a custom HeaderFilterStrategy to filter header to and from Camel 
message.
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy 
headerFilterStrategy) {
+        this.headerFilterStrategy = headerFilterStrategy;
+    }
+
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 05111f2..c585e05 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,10 +27,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -42,6 +44,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.header.Header;
 
 public class KafkaConsumer extends DefaultConsumer {
 
@@ -98,7 +101,7 @@ public class KafkaConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: 
{}",
-            endpoint.getConfiguration().getTopic(), 
endpoint.getConfiguration().isBreakOnFirstError());
+                endpoint.getConfiguration().getTopic(), 
endpoint.getConfiguration().isBreakOnFirstError());
         super.doStart();
 
         executor = endpoint.createExecutor();
@@ -276,10 +279,12 @@ public class KafkaConsumer extends DefaultConsumer {
                                 record = recordIterator.next();
                                 if (log.isTraceEnabled()) {
                                     log.trace("Partition = {}, offset = {}, 
key = {}, value = {}", record.partition(), record.offset(), record.key(),
-                                              record.value());
+                                            record.value());
                                 }
                                 Exchange exchange = 
endpoint.createKafkaExchange(record);
 
+                                propagateHeaders(record, exchange, 
endpoint.getConfiguration().getHeaderFilterStrategy());
+
                                 // if not auto commit then we have additional 
information on the exchange
                                 if (!isAutoCommitEnabled()) {
                                     
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordIterator.hasNext());
@@ -287,9 +292,9 @@ public class KafkaConsumer extends DefaultConsumer {
                                 if 
(endpoint.getConfiguration().isAllowManualCommit()) {
                                     // allow Camel users to access the Kafka 
consumer API to be able to do for example manual commits
                                     KafkaManualCommit manual = 
endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange, 
consumer, topicName, threadId,
-                                        offsetRepository, partition, 
record.offset());
+                                            offsetRepository, partition, 
record.offset());
                                     
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
-                                    
+
                                 }
 
                                 try {
@@ -303,7 +308,7 @@ public class KafkaConsumer extends DefaultConsumer {
                                     if 
(endpoint.getConfiguration().isBreakOnFirstError()) {
                                         // we are failing and we should break 
out
                                         log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.",
-                                            exchange, topicName, 
partitionLastOffset);
+                                                exchange, topicName, 
partitionLastOffset);
                                         // force commit so we resume on next 
poll where we failed
                                         commitOffset(offsetRepository, 
partition, partitionLastOffset, true);
                                         // continue to next partition
@@ -423,6 +428,16 @@ public class KafkaConsumer extends DefaultConsumer {
         }
     }
 
+    private void propagateHeaders(ConsumerRecord<Object, Object> record, 
Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+        StreamSupport.stream(record.headers().spliterator(), false)
+                .filter(header -> shouldBeFiltered(header, exchange, 
headerFilterStrategy))
+                .forEach(header -> exchange.getIn().setHeader(header.key(), 
header.value()));
+    }
+
+    private boolean shouldBeFiltered(Header header, Exchange exchange, 
HeaderFilterStrategy headerFilterStrategy) {
+        return !headerFilterStrategy.applyFilterToCamelHeaders(header.key(), 
header.value(), exchange);
+    }
+
     private boolean isAutoCommitEnabled() {
         return endpoint.getConfiguration().isAutoCommitEnable() != null && 
endpoint.getConfiguration().isAutoCommitEnable();
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
new file mode 100644
index 0000000..3c55daa
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class KafkaHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
+    public KafkaHeaderFilterStrategy() {
+        initialize();  
+    }
+
+    protected void initialize() {
+        // filter out kafka record metadata
+        getInFilter().add("org.apache.kafka.clients.producer.RecordMetadata");
+
+        // filter headers begin with "Camel" or "org.apache.camel"
+        
setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+        
setInFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4ec8ef4..f4a7e1a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -23,20 +23,26 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.URISupport;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.Bytes;
 
 public class KafkaProducer extends DefaultAsyncProducer {
@@ -142,8 +148,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     allowHeader = !headerTopic.equals(fromTopic);
                     if (!allowHeader) {
                         log.debug("Circular topic detected from message 
header."
-                            + " Cannot send to same topic as the message comes 
from: {}"
-                            + ". Will use endpoint configured topic: {}", 
from, topic);
+                                + " Cannot send to same topic as the message 
comes from: {}"
+                                + ". Will use endpoint configured topic: {}", 
from, topic);
                     }
                 }
             }
@@ -159,24 +165,28 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         // endpoint take precedence over header configuration
         final Integer partitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
-            ? endpoint.getConfiguration().getPartitionKey() : 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+                ? endpoint.getConfiguration().getPartitionKey() : 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
         final boolean hasPartitionKey = partitionKey != null;
 
         // endpoint take precedence over header configuration
         Object key = endpoint.getConfiguration().getKey() != null
-            ? endpoint.getConfiguration().getKey() : 
exchange.getIn().getHeader(KafkaConstants.KEY);
+                ? endpoint.getConfiguration().getKey() : 
exchange.getIn().getHeader(KafkaConstants.KEY);
         final Object messageKey = key != null
-            ? tryConvertToSerializedType(exchange, key, 
endpoint.getConfiguration().getKeySerializerClass()) : null;
+                ? tryConvertToSerializedType(exchange, key, 
endpoint.getConfiguration().getKeySerializerClass()) : null;
         final boolean hasMessageKey = messageKey != null;
 
+        // extracting headers which need to be propagated
+        HeaderFilterStrategy headerFilterStrategy = 
endpoint.getConfiguration().getHeaderFilterStrategy();
+        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, 
headerFilterStrategy);
+
         Object msg = exchange.getIn().getBody();
 
         // is the message body a list or something that contains multiple 
values
         Iterator<Object> iterator = null;
         if (msg instanceof Iterable) {
-            iterator = ((Iterable<Object>)msg).iterator();
+            iterator = ((Iterable<Object>) msg).iterator();
         } else if (msg instanceof Iterator) {
-            iterator = (Iterator<Object>)msg;
+            iterator = (Iterator<Object>) msg;
         }
         if (iterator != null) {
             final Iterator<Object> msgList = iterator;
@@ -194,11 +204,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     Object value = tryConvertToSerializedType(exchange, next, 
endpoint.getConfiguration().getSerializerClass());
 
                     if (hasPartitionKey && hasMessageKey) {
-                        return new ProducerRecord(msgTopic, partitionKey, key, 
value);
+                        return new ProducerRecord(msgTopic, partitionKey, 
null, key, value, propagatedHeaders);
                     } else if (hasMessageKey) {
-                        return new ProducerRecord(msgTopic, key, value);
+                        return new ProducerRecord(msgTopic, null, null, key, 
value, propagatedHeaders);
                     } else {
-                        return new ProducerRecord(msgTopic, value);
+                        return new ProducerRecord(msgTopic, null, null, null, 
value, propagatedHeaders);
                     }
                 }
 
@@ -214,15 +224,58 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
-            record = new ProducerRecord(topic, partitionKey, key, value);
+            record = new ProducerRecord(topic, partitionKey, null, key, value, 
propagatedHeaders);
         } else if (hasMessageKey) {
-            record = new ProducerRecord(topic, key, value);
+            record = new ProducerRecord(topic, null, null, key, value, 
propagatedHeaders);
         } else {
-            record = new ProducerRecord(topic, value);
+            record = new ProducerRecord(topic, null, null, null, value, 
propagatedHeaders);
         }
         return Collections.singletonList(record).iterator();
     }
 
+    private List<Header> getPropagatedHeaders(Exchange exchange, 
HeaderFilterStrategy headerFilterStrategy) {
+        return exchange.getIn().getHeaders().entrySet().stream()
+                .filter(entry -> shouldBeFiltered(entry, exchange, 
headerFilterStrategy))
+                .map(this::getRecordHeader)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
+
+    private boolean shouldBeFiltered(Map.Entry<String, Object> entry, Exchange 
exchange, HeaderFilterStrategy headerFilterStrategy) {
+        return 
!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), 
entry.getValue(), exchange);
+    }
+
+    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry) {
+        byte[] headerValue = getHeaderValue(entry.getValue());
+        if (headerValue == null) {
+            return null;
+        }
+        return new RecordHeader(entry.getKey(), headerValue);
+    }
+
+    private byte[] getHeaderValue(Object value) {
+        if (value instanceof String) {
+            return ((String) value).getBytes();
+        } else if (value instanceof Long) {
+            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+            buffer.putLong((Long) value);
+            return buffer.array();
+        } else if (value instanceof Integer) {
+            ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+            buffer.putInt((Integer) value);
+            return buffer.array();
+        } else if (value instanceof Double) {
+            ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+            buffer.putDouble((Double) value);
+            return buffer.array();
+        } else if (value instanceof byte[]) {
+            return (byte[]) value;
+        }
+        log.debug("Cannot propagate header value of type[{}], skipping... " +
+                "Supported types: String, Integer, Long, Double, byte[].", 
value != null ? value.getClass() : "null");
+        return null;
+    }
+
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
     // Camel calls this method if the endpoint isSynchronous(), as the 
KafkaEndpoint creates a SynchronousDelegateProducer for it
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 5b35c8e..17272a4 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.StreamSupport;
 
@@ -25,6 +26,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -71,20 +73,30 @@ public class KafkaConsumerFullTest extends 
BaseEmbeddedKafkaTest {
 
     @Test
     public void kafkaMessageIsConsumedByCamel() throws InterruptedException, 
IOException {
+        String propagatedHeaderKey = "PropagatedCustomHeader";
+        byte[] propagatedHeaderValue = "propagated header value".getBytes();
+        String skippedHeaderKey = "CamelSkippedHeader";
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
         // The LAST_RECORD_BEFORE_COMMIT header should not be configured on 
any exchange because autoCommitEnable=true
         
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
 null, null, null, null, null);
+        to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
 
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
             ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped 
header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
             producer.send(data);
         }
 
         to.assertIsSatisfied(3000);
 
         assertEquals(5, 
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
 false).count());
+
+        Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertFalse("Should not receive skipped header", 
headers.containsKey(skippedHeaderKey));
+        assertTrue("Should receive propagated header", 
headers.containsKey(propagatedHeaderKey));
     }
 
     @Test
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index a7e43da..643c783 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,14 +17,17 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
@@ -33,9 +36,14 @@ import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -48,24 +56,32 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     private static final String TOPIC_BYTES = "testBytes";
     private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
     private static final String GROUP_BYTES = "groupStrings";
+    private static final String TOPIC_PROPAGATED_HEADERS = 
"testPropagatedHeaders";
 
     private static KafkaConsumer<String, String> stringsConsumerConn;
     private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
 
     @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
     private Endpoint toStrings;
+
     @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + 
"?requestRequiredAcks=-1&partitionKey=1")
     private Endpoint toStrings2;
+
     @EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED + 
"?requestRequiredAcks=-1"
             + 
"&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor")
     private Endpoint toStringsWithInterceptor;
+
     @EndpointInject(uri = "mock:kafkaAck")
     private MockEndpoint mockEndpoint;
+
     @EndpointInject(uri = "kafka:" + TOPIC_BYTES + "?requestRequiredAcks=-1"
             + 
"&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
             + 
"keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
     private Endpoint toBytes;
 
+    @EndpointInject(uri = "kafka:" + TOPIC_PROPAGATED_HEADERS + 
"?requestRequiredAcks=-1")
+    private Endpoint toPropagatedHeaders;
+
     @Produce(uri = "direct:startStrings")
     private ProducerTemplate stringsTemplate;
 
@@ -78,6 +94,16 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @Produce(uri = "direct:startTraced")
     private ProducerTemplate interceptedTemplate;
 
+    @Produce(uri = "direct:propagatedHeaders")
+    private ProducerTemplate propagatedHeadersTemplate;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myStrategy", new MyHeaderFilterStrategy());
+        return jndi;
+    }
+
     @BeforeClass
     public static void before() {
         Properties stringsProps = new Properties();
@@ -118,6 +144,8 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
                 from("direct:startBytes").to(toBytes).to(mockEndpoint);
 
                 
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
+
+                
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
             }
         };
     }
@@ -271,6 +299,88 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         }
     }
 
+    @Test
+    public void propagatedHeaderIsReceivedByKafka() throws Exception {
+        String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER";
+        String propagatedStringHeaderValue = "propagated string header value";
+
+        String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER";
+        Integer propagatedIntegerHeaderValue = 54545;
+
+        String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER";
+        Long propagatedLongHeaderValue = 5454545454545L;
+
+        String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER";
+        Double propagatedDoubleHeaderValue = 43434.545D;
+
+        String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
+        byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3, 
54, -34};
+
+        Map<String, Object> camelHeaders = new HashMap<>();
+        camelHeaders.put(propagatedStringHeaderKey, 
propagatedStringHeaderValue);
+        camelHeaders.put(propagatedIntegerHeaderKey, 
propagatedIntegerHeaderValue);
+        camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
+        camelHeaders.put(propagatedDoubleHeaderKey, 
propagatedDoubleHeaderValue);
+        camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+        camelHeaders.put("CustomObjectHeader", new Object());
+        camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
+
+        CountDownLatch messagesLatch = new CountDownLatch(1);
+        propagatedHeadersTemplate.sendBodyAndHeaders("Some test message", 
camelHeaders);
+
+        List<ConsumerRecord<String, String>> records = 
pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch);
+        boolean allMessagesReceived = messagesLatch.await(10_000, 
TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount(), allMessagesReceived);
+
+        ConsumerRecord<String, String> record = records.get(0);
+        Headers headers = record.headers();
+        assertNotNull("Kafka Headers should not be null.", headers);
+        assertEquals("One propagated header is expected.", 5, 
headers.toArray().length);
+        assertEquals("Propagated string value received", 
propagatedStringHeaderValue,
+                new String(getHeaderValue(propagatedStringHeaderKey, 
headers)));
+        assertEquals("Propagated integer value received", 
propagatedIntegerHeaderValue,
+                new 
Integer(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey, 
headers)).getInt()));
+        assertEquals("Propagated long value received", 
propagatedLongHeaderValue,
+                new 
Long(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey, 
headers)).getLong()));
+        assertEquals("Propagated double value received", 
propagatedDoubleHeaderValue,
+                new 
Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey, 
headers)).getDouble()));
+        assertArrayEquals("Propagated byte array value received", 
propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers));
+    }
+
+    @Test
+    public void headerFilterStrategyCouldBeOverridden() {
+        KafkaEndpoint kafkaEndpoint = 
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
 KafkaEndpoint.class);
+        assertIsInstanceOf(MyHeaderFilterStrategy.class, 
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
+    }
+
+    private byte[] getHeaderValue(String headerKey, Headers headers) {
+        Header foundHeader = StreamSupport.stream(headers.spliterator(), false)
+                .filter(header -> header.key().equals(headerKey))
+                .findFirst()
+                .orElse(null);
+        assertNotNull("Header should be sent", foundHeader);
+        return foundHeader.value();
+    }
+
+    private List<ConsumerRecord<String, String>> 
pollForRecords(KafkaConsumer<String, String> consumerConn,
+                                                                String topic, 
CountDownLatch messagesLatch) {
+
+        List<ConsumerRecord<String, String>> consumedRecords = new 
ArrayList<>();
+        consumerConn.subscribe(Collections.singletonList(topic));
+
+        new Thread(() -> {
+            while (messagesLatch.getCount() != 0) {
+                for (ConsumerRecord<String, String> record : 
consumerConn.poll(100)) {
+                    consumedRecords.add(record);
+                    messagesLatch.countDown();
+                }
+            }
+        }).start();
+
+        return consumedRecords;
+    }
+
     private void createKafkaMessageConsumer(KafkaConsumer<String, String> 
consumerConn,
                                             String topic, String 
topicInHeader, CountDownLatch messagesLatch) {
 
@@ -323,4 +433,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         }
     }
 
+    private static class MyHeaderFilterStrategy extends 
DefaultHeaderFilterStrategy {
+    }
+
 }
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index f5624b1..e46ad41 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.springboot;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Generated;
 import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
 import org.apache.camel.util.jsse.SSLContextParameters;
@@ -751,6 +752,11 @@ public class KafkaComponentConfiguration
          * increase, 20% random jitter is added to avoid connection storms.
          */
         private Integer reconnectBackoffMaxMs = 1000;
+        /**
+         * To use a custom HeaderFilterStrategy to filter header to and from
+         * Camel message.
+         */
+        private HeaderFilterStrategy headerFilterStrategy;
 
         public Boolean getTopicIsPattern() {
             return topicIsPattern;
@@ -1452,5 +1458,14 @@ public class KafkaComponentConfiguration
         public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
             this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
         }
+
+        public HeaderFilterStrategy getHeaderFilterStrategy() {
+            return headerFilterStrategy;
+        }
+
+        public void setHeaderFilterStrategy(
+                HeaderFilterStrategy headerFilterStrategy) {
+            this.headerFilterStrategy = headerFilterStrategy;
+        }
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to