This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 99fb9be CAMEL-12503 : support for propagating camel headers to kafka and vice versa 99fb9be is described below commit 99fb9be724c59e3c1b2bd77838a114a2d505e2cd Author: Taras Danylchuk <tdanylc...@playtika.com> AuthorDate: Thu May 10 17:33:03 2018 +0300 CAMEL-12503 : support for propagating camel headers to kafka and vice versa --- .../camel-kafka/src/main/docs/kafka-component.adoc | 25 ++++- .../camel/component/kafka/KafkaConfiguration.java | 36 +++++-- .../camel/component/kafka/KafkaConsumer.java | 25 ++++- .../component/kafka/KafkaHeaderFilterStrategy.java | 35 +++++++ .../camel/component/kafka/KafkaProducer.java | 79 +++++++++++--- .../component/kafka/KafkaConsumerFullTest.java | 12 +++ .../component/kafka/KafkaProducerFullTest.java | 113 +++++++++++++++++++++ .../springboot/KafkaComponentConfiguration.java | 15 +++ 8 files changed, 312 insertions(+), 28 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 7d89604..13244b7 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -72,7 +72,7 @@ with the following path and query parameters: |=== -==== Query Parameters (90 parameters): +==== Query Parameters (91 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -80,6 +80,7 @@ with the following path and query parameters: | Name | Description | Default | Type | *brokers* (common) | URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation. | | String | *clientId* (common) | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | | String +| *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to filter header to and from Camel message. | | HeaderFilterStrategy | *reconnectBackoffMaxMs* (common) | The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. | 1000 | Integer | *allowManualCommit* (consumer) | Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of KafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. | false | boolean | *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. | true | Boolean @@ -427,3 +428,25 @@ This will force a synchronous commit which will block until the commit is acknow If you want to use a custom implementation of `KafkaManualCommit` then you can configure a custom `KafkaManualCommitFactory` on the `KafkaComponent` that creates instances of your custom implementation. + +=== Kafka Headers propagation +*Available as of Camel 2.22* + +When consuming messages from Kafka, headers will be propagated to camel exchange headers automatically. +Producing flow backed by same behaviour - camel headers of particular exchange will be propagated to kafka message headers. + +Since kafka headers allows only `byte[]` values, in order camel exchnage header to be propagated its value should be serialized to `bytes[]`, +otherwise header will be skipped. +Following header value types are supported: `String`, `Integer`, `Long`, `Double`, `byte[]`. +Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value. + +By default all headers are being filtered by `KafkaHeaderFilterStrategy`. +Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes. +Default strategy can be overridden by using `headerFilterStrategy` uri parameter in both `to` and `from` routes: +``` +from("kafka:my_topic?headerFilterStrategy=#myStrategy") +... +.to("kafka:my_topic?headerFilterStrategy=#myStrategy") +``` + +`myStrategy` object should be subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is `CamelContext` aware. diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 9b5ba6b..dabd475 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -24,6 +24,8 @@ import java.util.stream.Collectors; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.StateRepository; import org.apache.camel.spi.UriParam; @@ -43,15 +45,18 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; @UriParams -public class KafkaConfiguration implements Cloneable { +public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware { //Common configuration properties - @UriPath(label = "common") @Metadata(required = "true") + @UriPath(label = "common") + @Metadata(required = "true") private String topic; @UriParam(label = "common") private String brokers; @UriParam(label = "common") private String clientId; + @UriParam(label = "common", description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.") + private HeaderFilterStrategy headerFilterStrategy = new KafkaHeaderFilterStrategy(); @UriParam(label = "consumer") private boolean topicIsPattern; @@ -294,10 +299,10 @@ public class KafkaConfiguration implements Cloneable { private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR; @UriParam(label = "common,security", defaultValue = "DEFAULT") //sasl.kerberos.principal.to.local.rules - private String kerberosPrincipalToLocalRules; + private String kerberosPrincipalToLocalRules; @UriParam(label = "common,security", secret = true) //sasl.jaas.config - private String saslJaasConfig; + private String saslJaasConfig; public KafkaConfiguration() { } @@ -343,7 +348,7 @@ public class KafkaConfiguration implements Cloneable { addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isEnableIdempotence()); addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs()); - + // SSL applySslConfiguration(props, getSslContextParameters()); addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol()); @@ -403,7 +408,7 @@ public class KafkaConfiguration implements Cloneable { addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs()); - + // SSL applySslConfiguration(props, getSslContextParameters()); addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword()); @@ -1029,14 +1034,14 @@ public class KafkaConfiguration implements Cloneable { public void setSaslMechanism(String saslMechanism) { this.saslMechanism = saslMechanism; } - + public String getSaslJaasConfig() { return saslJaasConfig; } /** * Expose the kafka sasl.jaas.config parameter - * + * * Example: * org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD"; */ @@ -1498,7 +1503,7 @@ public class KafkaConfiguration implements Cloneable { * Set if KafkaConsumer will read from beginning or end on startup: * beginning : read from beginning * end : read from end - * + * * This is replacing the earlier property seekToBeginning */ public void setSeekTo(String seekTo) { @@ -1559,6 +1564,7 @@ public class KafkaConfiguration implements Cloneable { public String getInterceptorClasses() { return interceptorClasses; } + /** * Sets interceptors for producer or consumers. * Producer interceptors have to be classes implementing {@link org.apache.kafka.clients.producer.ProducerInterceptor} @@ -1596,4 +1602,16 @@ public class KafkaConfiguration implements Cloneable { public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) { this.reconnectBackoffMaxMs = reconnectBackoffMaxMs; } + + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + + /** + * To use a custom HeaderFilterStrategy to filter header to and from Camel message. + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } + } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 05111f2..c585e05 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -27,10 +27,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; +import java.util.stream.StreamSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.StateRepository; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; @@ -42,6 +44,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.header.Header; public class KafkaConsumer extends DefaultConsumer { @@ -98,7 +101,7 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", - endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError()); + endpoint.getConfiguration().getTopic(), endpoint.getConfiguration().isBreakOnFirstError()); super.doStart(); executor = endpoint.createExecutor(); @@ -276,10 +279,12 @@ public class KafkaConsumer extends DefaultConsumer { record = recordIterator.next(); if (log.isTraceEnabled()) { log.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), - record.value()); + record.value()); } Exchange exchange = endpoint.createKafkaExchange(record); + propagateHeaders(record, exchange, endpoint.getConfiguration().getHeaderFilterStrategy()); + // if not auto commit then we have additional information on the exchange if (!isAutoCommitEnabled()) { exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordIterator.hasNext()); @@ -287,9 +292,9 @@ public class KafkaConsumer extends DefaultConsumer { if (endpoint.getConfiguration().isAllowManualCommit()) { // allow Camel users to access the Kafka consumer API to be able to do for example manual commits KafkaManualCommit manual = endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange, consumer, topicName, threadId, - offsetRepository, partition, record.offset()); + offsetRepository, partition, record.offset()); exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual); - + } try { @@ -303,7 +308,7 @@ public class KafkaConsumer extends DefaultConsumer { if (endpoint.getConfiguration().isBreakOnFirstError()) { // we are failing and we should break out log.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", - exchange, topicName, partitionLastOffset); + exchange, topicName, partitionLastOffset); // force commit so we resume on next poll where we failed commitOffset(offsetRepository, partition, partitionLastOffset, true); // continue to next partition @@ -423,6 +428,16 @@ public class KafkaConsumer extends DefaultConsumer { } } + private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { + StreamSupport.stream(record.headers().spliterator(), false) + .filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy)) + .forEach(header -> exchange.getIn().setHeader(header.key(), header.value())); + } + + private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { + return !headerFilterStrategy.applyFilterToCamelHeaders(header.key(), header.value(), exchange); + } + private boolean isAutoCommitEnabled() { return endpoint.getConfiguration().isAutoCommitEnable() != null && endpoint.getConfiguration().isAutoCommitEnable(); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java new file mode 100644 index 0000000..3c55daa --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import org.apache.camel.impl.DefaultHeaderFilterStrategy; + +public class KafkaHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + + public KafkaHeaderFilterStrategy() { + initialize(); + } + + protected void initialize() { + // filter out kafka record metadata + getInFilter().add("org.apache.kafka.clients.producer.RecordMetadata"); + + // filter headers begin with "Camel" or "org.apache.camel" + setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); + setInFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 4ec8ef4..f4a7e1a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -23,20 +23,26 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.URISupport; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.Bytes; public class KafkaProducer extends DefaultAsyncProducer { @@ -142,8 +148,8 @@ public class KafkaProducer extends DefaultAsyncProducer { allowHeader = !headerTopic.equals(fromTopic); if (!allowHeader) { log.debug("Circular topic detected from message header." - + " Cannot send to same topic as the message comes from: {}" - + ". Will use endpoint configured topic: {}", from, topic); + + " Cannot send to same topic as the message comes from: {}" + + ". Will use endpoint configured topic: {}", from, topic); } } } @@ -159,24 +165,28 @@ public class KafkaProducer extends DefaultAsyncProducer { // endpoint take precedence over header configuration final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null - ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class); + ? endpoint.getConfiguration().getPartitionKey() : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class); final boolean hasPartitionKey = partitionKey != null; // endpoint take precedence over header configuration Object key = endpoint.getConfiguration().getKey() != null - ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY); + ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY); final Object messageKey = key != null - ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null; + ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null; final boolean hasMessageKey = messageKey != null; + // extracting headers which need to be propagated + HeaderFilterStrategy headerFilterStrategy = endpoint.getConfiguration().getHeaderFilterStrategy(); + List<Header> propagatedHeaders = getPropagatedHeaders(exchange, headerFilterStrategy); + Object msg = exchange.getIn().getBody(); // is the message body a list or something that contains multiple values Iterator<Object> iterator = null; if (msg instanceof Iterable) { - iterator = ((Iterable<Object>)msg).iterator(); + iterator = ((Iterable<Object>) msg).iterator(); } else if (msg instanceof Iterator) { - iterator = (Iterator<Object>)msg; + iterator = (Iterator<Object>) msg; } if (iterator != null) { final Iterator<Object> msgList = iterator; @@ -194,11 +204,11 @@ public class KafkaProducer extends DefaultAsyncProducer { Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass()); if (hasPartitionKey && hasMessageKey) { - return new ProducerRecord(msgTopic, partitionKey, key, value); + return new ProducerRecord(msgTopic, partitionKey, null, key, value, propagatedHeaders); } else if (hasMessageKey) { - return new ProducerRecord(msgTopic, key, value); + return new ProducerRecord(msgTopic, null, null, key, value, propagatedHeaders); } else { - return new ProducerRecord(msgTopic, value); + return new ProducerRecord(msgTopic, null, null, null, value, propagatedHeaders); } } @@ -214,15 +224,58 @@ public class KafkaProducer extends DefaultAsyncProducer { ProducerRecord record; if (hasPartitionKey && hasMessageKey) { - record = new ProducerRecord(topic, partitionKey, key, value); + record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders); } else if (hasMessageKey) { - record = new ProducerRecord(topic, key, value); + record = new ProducerRecord(topic, null, null, key, value, propagatedHeaders); } else { - record = new ProducerRecord(topic, value); + record = new ProducerRecord(topic, null, null, null, value, propagatedHeaders); } return Collections.singletonList(record).iterator(); } + private List<Header> getPropagatedHeaders(Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { + return exchange.getIn().getHeaders().entrySet().stream() + .filter(entry -> shouldBeFiltered(entry, exchange, headerFilterStrategy)) + .map(this::getRecordHeader) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private boolean shouldBeFiltered(Map.Entry<String, Object> entry, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) { + return !headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange); + } + + private RecordHeader getRecordHeader(Map.Entry<String, Object> entry) { + byte[] headerValue = getHeaderValue(entry.getValue()); + if (headerValue == null) { + return null; + } + return new RecordHeader(entry.getKey(), headerValue); + } + + private byte[] getHeaderValue(Object value) { + if (value instanceof String) { + return ((String) value).getBytes(); + } else if (value instanceof Long) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong((Long) value); + return buffer.array(); + } else if (value instanceof Integer) { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt((Integer) value); + return buffer.array(); + } else if (value instanceof Double) { + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES); + buffer.putDouble((Double) value); + return buffer.array(); + } else if (value instanceof byte[]) { + return (byte[]) value; + } + log.debug("Cannot propagate header value of type[{}], skipping... " + + "Supported types: String, Integer, Long, Double, byte[].", value != null ? value.getClass() : "null"); + return null; + } + @Override @SuppressWarnings({"unchecked", "rawtypes"}) // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index 5b35c8e..17272a4 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.Map; import java.util.Properties; import java.util.stream.StreamSupport; @@ -25,6 +26,7 @@ import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -71,20 +73,30 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { @Test public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException { + String propagatedHeaderKey = "PropagatedCustomHeader"; + byte[] propagatedHeaderValue = "propagated header value".getBytes(); + String skippedHeaderKey = "CamelSkippedHeader"; to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any exchange because autoCommitEnable=true to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, null, null, null, null, null); + to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue); for (int k = 0; k < 5; k++) { String msg = "message-" + k; ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg); + data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes())); + data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue)); producer.send(data); } to.assertIsSatisfied(3000); assertEquals(5, StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(), false).count()); + + Map<String, Object> headers = to.getExchanges().get(0).getIn().getHeaders(); + assertFalse("Should not receive skipped header", headers.containsKey(skippedHeaderKey)); + assertTrue("Should receive propagated header", headers.containsKey(propagatedHeaderKey)); } @Test diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index a7e43da..643c783 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -17,14 +17,17 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; @@ -33,9 +36,14 @@ import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultHeaderFilterStrategy; +import org.apache.camel.impl.JndiRegistry; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -48,24 +56,32 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { private static final String TOPIC_BYTES = "testBytes"; private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader"; private static final String GROUP_BYTES = "groupStrings"; + private static final String TOPIC_PROPAGATED_HEADERS = "testPropagatedHeaders"; private static KafkaConsumer<String, String> stringsConsumerConn; private static KafkaConsumer<byte[], byte[]> bytesConsumerConn; @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1") private Endpoint toStrings; + @EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1&partitionKey=1") private Endpoint toStrings2; + @EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1" + "&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor") private Endpoint toStringsWithInterceptor; + @EndpointInject(uri = "mock:kafkaAck") private MockEndpoint mockEndpoint; + @EndpointInject(uri = "kafka:" + TOPIC_BYTES + "?requestRequiredAcks=-1" + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&" + "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer") private Endpoint toBytes; + @EndpointInject(uri = "kafka:" + TOPIC_PROPAGATED_HEADERS + "?requestRequiredAcks=-1") + private Endpoint toPropagatedHeaders; + @Produce(uri = "direct:startStrings") private ProducerTemplate stringsTemplate; @@ -78,6 +94,16 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { @Produce(uri = "direct:startTraced") private ProducerTemplate interceptedTemplate; + @Produce(uri = "direct:propagatedHeaders") + private ProducerTemplate propagatedHeadersTemplate; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myStrategy", new MyHeaderFilterStrategy()); + return jndi; + } + @BeforeClass public static void before() { Properties stringsProps = new Properties(); @@ -118,6 +144,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { from("direct:startBytes").to(toBytes).to(mockEndpoint); from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint); + + from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint); } }; } @@ -271,6 +299,88 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { } } + @Test + public void propagatedHeaderIsReceivedByKafka() throws Exception { + String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER"; + String propagatedStringHeaderValue = "propagated string header value"; + + String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER"; + Integer propagatedIntegerHeaderValue = 54545; + + String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER"; + Long propagatedLongHeaderValue = 5454545454545L; + + String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER"; + Double propagatedDoubleHeaderValue = 43434.545D; + + String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER"; + byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3, 54, -34}; + + Map<String, Object> camelHeaders = new HashMap<>(); + camelHeaders.put(propagatedStringHeaderKey, propagatedStringHeaderValue); + camelHeaders.put(propagatedIntegerHeaderKey, propagatedIntegerHeaderValue); + camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue); + camelHeaders.put(propagatedDoubleHeaderKey, propagatedDoubleHeaderValue); + camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue); + camelHeaders.put("CustomObjectHeader", new Object()); + camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value"); + + CountDownLatch messagesLatch = new CountDownLatch(1); + propagatedHeadersTemplate.sendBodyAndHeaders("Some test message", camelHeaders); + + List<ConsumerRecord<String, String>> records = pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch); + boolean allMessagesReceived = messagesLatch.await(10_000, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); + + ConsumerRecord<String, String> record = records.get(0); + Headers headers = record.headers(); + assertNotNull("Kafka Headers should not be null.", headers); + assertEquals("One propagated header is expected.", 5, headers.toArray().length); + assertEquals("Propagated string value received", propagatedStringHeaderValue, + new String(getHeaderValue(propagatedStringHeaderKey, headers))); + assertEquals("Propagated integer value received", propagatedIntegerHeaderValue, + new Integer(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey, headers)).getInt())); + assertEquals("Propagated long value received", propagatedLongHeaderValue, + new Long(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey, headers)).getLong())); + assertEquals("Propagated double value received", propagatedDoubleHeaderValue, + new Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey, headers)).getDouble())); + assertArrayEquals("Propagated byte array value received", propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers)); + } + + @Test + public void headerFilterStrategyCouldBeOverridden() { + KafkaEndpoint kafkaEndpoint = context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy", KafkaEndpoint.class); + assertIsInstanceOf(MyHeaderFilterStrategy.class, kafkaEndpoint.getConfiguration().getHeaderFilterStrategy()); + } + + private byte[] getHeaderValue(String headerKey, Headers headers) { + Header foundHeader = StreamSupport.stream(headers.spliterator(), false) + .filter(header -> header.key().equals(headerKey)) + .findFirst() + .orElse(null); + assertNotNull("Header should be sent", foundHeader); + return foundHeader.value(); + } + + private List<ConsumerRecord<String, String>> pollForRecords(KafkaConsumer<String, String> consumerConn, + String topic, CountDownLatch messagesLatch) { + + List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>(); + consumerConn.subscribe(Collections.singletonList(topic)); + + new Thread(() -> { + while (messagesLatch.getCount() != 0) { + for (ConsumerRecord<String, String> record : consumerConn.poll(100)) { + consumedRecords.add(record); + messagesLatch.countDown(); + } + } + }).start(); + + return consumedRecords; + } + private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn, String topic, String topicInHeader, CountDownLatch messagesLatch) { @@ -323,4 +433,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { } } + private static class MyHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + } + } diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java index f5624b1..e46ad41 100644 --- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java @@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.springboot; import java.util.concurrent.ExecutorService; import javax.annotation.Generated; import org.apache.camel.component.kafka.KafkaManualCommitFactory; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.StateRepository; import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon; import org.apache.camel.util.jsse.SSLContextParameters; @@ -751,6 +752,11 @@ public class KafkaComponentConfiguration * increase, 20% random jitter is added to avoid connection storms. */ private Integer reconnectBackoffMaxMs = 1000; + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + */ + private HeaderFilterStrategy headerFilterStrategy; public Boolean getTopicIsPattern() { return topicIsPattern; @@ -1452,5 +1458,14 @@ public class KafkaComponentConfiguration public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) { this.reconnectBackoffMaxMs = reconnectBackoffMaxMs; } + + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + + public void setHeaderFilterStrategy( + HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact davscl...@apache.org.