Repository: camel Updated Branches: refs/heads/master 7085fd7b2 -> 9584f3851
Update camel-kafka documentation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9584f385 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9584f385 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9584f385 Branch: refs/heads/master Commit: 9584f385150e6c25b3b288988eae98c0757583c4 Parents: 7085fd7 Author: Antoine DESSAIGNE <antoine.dessai...@gmail.com> Authored: Fri Jan 13 13:26:48 2017 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Jan 13 13:49:54 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 106 ++++++++++--------- 1 file changed, 54 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9584f385/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 52d0aa7..d100a32 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -137,57 +137,65 @@ The Kafka component supports 78 endpoint options which are listed below: {% endraw %} // endpoint options: END - - - For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs] -### Samples +### Message headers -#### Camel 2.17 or newer +#### Consumer headers -Consuming messages: +The following headers are available when consuming messages from Kafka. +[width="100%",cols="2m,2m,1m,5",options="header"] +|======================================================================================================= +| Header constant | Header value | Type | Description +| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic from where the message originated +| KafkaConstants.PARTITION | "kafka.PARTITION" | Integer | The partition where the message was stored +| KafkaConstants.OFFSET | "kafka.OFFSET" | Long | The offset of the message +| KafkaConstants.KEY | "kafka.KEY" | Object | The key of the message if configured +|======================================================================================================= -[source,java] -------------------------------------------------------------------------------------------------- -from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1") - .process(new Processor() { - @Override - public void process(Exchange exchange) - throws Exception { - String messageKey = ""; - if (exchange.getIn() != null) { - Message message = exchange.getIn(); - Integer partitionId = (Integer) message - .getHeader(KafkaConstants.PARTITION); - String topicName = (String) message - .getHeader(KafkaConstants.TOPIC); - if (message.getHeader(KafkaConstants.KEY) != null) - messageKey = (String) message - .getHeader(KafkaConstants.KEY); - Object data = message.getBody(); - - - System.out.println("topicName :: " - + topicName + " partitionId :: " - + partitionId + " messageKey :: " - + messageKey + " message :: " - + data + "\n"); - } - } - }).to("log:input"); -------------------------------------------------------------------------------------------------- +#### Producer headers + +Before sending a message to Kafka you can configure the following headers. +[width="100%",cols="2m,2m,1m,5",options="header"] +|============================================================================================================================================================================ +| Header constant | Header value | Type | Description +| KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition +| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`) +| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined) +|============================================================================================================================================================================ + +After the message is sent to Kafka, the following headers are available +[width="100%",cols="2m,2m,1m,5",options="header"] +|============================================================================================================================================================================================== +| Header constant | Header value | Type | Description +| KafkaConstants.KAFKA_RECORDMETA | "org.apache.kafka.clients.producer.RecordMetadata" | List<RecordMetadata> | The metadata (only configured if `recordMetadata` endpoint parameter is `true` +|============================================================================================================================================================================================== +### Samples + +#### Consuming messages from Kafka + +Here is the minimal route you need in order to read messages from Kafka. +[source,java] +------------------------------------------------------------- +from("kafka:localhost:9092?topic=test&groupId=testing") + .log("Message received from Kafka : ${body}") + .log(" on the topic ${headers[kafka.TOPIC]}") + .log(" on the partition ${headers[kafka.PARTITION]}") + .log(" with the offset ${headers[kafka.OFFSET]}") + .log(" with the key ${headers[kafka.KEY]}") +------------------------------------------------------------- + When consuming messages from Kafka you can use your own offset management and not delegate this management to Kafka. In order to keep the offsets the component needs a `StateRepository` implementation such as `FileStateRepository`. This bean should be available in the registry. Here how to use it : [source,java] -------------------------------------------------------------------------------------------------------------------- +---------------------------------------------------------------------------------------------------------------------------- // Create the repository in which the Kafka offsets will be persisted FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); @@ -207,25 +215,19 @@ camelContext.addRoutes(new RouteBuilder() { .to("mock:result"); } }); -------------------------------------------------------------------------------------------------------------------- +---------------------------------------------------------------------------------------------------------------------------- Â -Producing messages: +#### Producing messages to Kafka +Here is the minimal route you need in order to write messages to Kafka. [source,java] ---------------------------------------------------------------------------------------------------------------- - -from("direct:start").process(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class); - exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); - exchange.getIn().setHeader(KafkaConstants.KEY, "1"); - } - }).to("kafka:localhost:9092?topic=test"); ---------------------------------------------------------------------------------------------------------------- - -Â +---------------------------------------------------------------------------- +from("direct:start") + .setBody(constant("Message from Camel")) // Message to send + .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message + .to("kafka:localhost:9092?topic=test"); +---------------------------------------------------------------------------- ### Endpoints @@ -260,4 +262,4 @@ http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/Pollin * link:configuring-camel.html[Configuring Camel] * link:message-endpoint.html[Message Endpoint] pattern * link:uris.html[URIs] -* link:writing-components.html[Writing Components] \ No newline at end of file +* link:writing-components.html[Writing Components]