Repository: camel Updated Branches: refs/heads/master 7445dbccb -> cc06080b0
CAMEL-10586: make the kafka endpoint a little easier to use. The producer can now automatic convert to the serializer configured. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ec9b418a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ec9b418a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ec9b418a Branch: refs/heads/master Commit: ec9b418a5b00782f7edeaf766bc2d537ae30269d Parents: 7445dbc Author: Claus Ibsen <[email protected]> Authored: Sat Mar 4 10:26:15 2017 +0100 Committer: Claus Ibsen <[email protected]> Committed: Sat Mar 4 10:26:15 2017 +0100 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 46 +++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ec9b418a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index b5c192e..f78e369 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -131,11 +131,15 @@ public class KafkaProducer extends DefaultAsyncProducer { final boolean hasPartitionKey = partitionKey != null; // endpoint take precedence over header configuration - final Object messageKey = endpoint.getConfiguration().getKey() != null + Object key = endpoint.getConfiguration().getKey() != null ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY); + final Object messageKey = key != null + ? getMessageKey(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null; final boolean hasMessageKey = messageKey != null; Object msg = exchange.getIn().getBody(); + + // is the message body a list or something that contains multiple values Iterator<Object> iterator = null; if (msg instanceof Iterable) { iterator = ((Iterable<Object>)msg).iterator(); @@ -153,12 +157,16 @@ public class KafkaProducer extends DefaultAsyncProducer { @Override public ProducerRecord next() { + // must convert each entry of the iterator into the value according to the serializer + Object next = msgList.next(); + Object value = getMessageValue(exchange, next, endpoint.getConfiguration().getSerializerClass()); + if (hasPartitionKey && hasMessageKey) { - return new ProducerRecord(msgTopic, partitionKey, messageKey, msgList.next()); + return new ProducerRecord(msgTopic, partitionKey, key, value); } else if (hasMessageKey) { - return new ProducerRecord(msgTopic, messageKey, msgList.next()); + return new ProducerRecord(msgTopic, key, value); } - return new ProducerRecord(msgTopic, msgList.next()); + return new ProducerRecord(msgTopic, value); } @Override @@ -167,14 +175,18 @@ public class KafkaProducer extends DefaultAsyncProducer { } }; } + + // must convert each entry of the iterator into the value according to the serializer + Object value = getMessageValue(exchange, msg, endpoint.getConfiguration().getSerializerClass()); + ProducerRecord record; if (hasPartitionKey && hasMessageKey) { - record = new ProducerRecord(topic, partitionKey, messageKey, msg); + record = new ProducerRecord(topic, partitionKey, key, value); } else if (hasMessageKey) { - record = new ProducerRecord(topic, messageKey, msg); + record = new ProducerRecord(topic, key, value); } else { log.warn("No message key or partition key set"); - record = new ProducerRecord(topic, msg); + record = new ProducerRecord(topic, value); } return Collections.singletonList(record).iterator(); } @@ -222,6 +234,26 @@ public class KafkaProducer extends DefaultAsyncProducer { return true; } + protected Object getMessageKey(Exchange exchange, Object key, String keySerializer) { + Object answer = key; + if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(keySerializer)) { + // its string based so ensure key is string as well + answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, key); + } + // TODO: other serializers + return answer; + } + + protected Object getMessageValue(Exchange exchange, Object value, String valueSerializer) { + Object answer = value; + if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(valueSerializer)) { + // its string based so ensure value is string as well + answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, value); + } + // TODO: other serializers + return answer; + } + private final class KafkaProducerCallBack implements Callback { private final Exchange exchange;
