Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their pom.xml
for this component:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
kafka:server:port[?options]
Options
Property |
Default |
Description |
zookeeperHost |
|
The zookeeper host to use |
zookeeperPort |
2181 |
The zookeeper port to use |
topic |
|
The topic to use |
groupId |
|
|
partitioner |
|
|
consumerStreams |
10 |
|
clientId |
|
|
zookeeperSessionTimeoutMs |
|
|
zookeeperConnectionTimeoutMs |
|
|
zookeeperSyncTimeMs |
|
|
You can append query options to the URI in the following format, ?option=value&option=value&...
Producer Options
Property |
Default |
Description |
producerType |
|
|
compressionCodec |
|
|
compressedTopics |
|
|
messageSendMaxRetries |
|
|
retryBackoffMs |
|
|
topicMetadataRefreshIntervalMs |
|
|
sendBufferBytes |
|
|
requestRequiredAcks |
|
|
requestTimeoutMs |
|
|
queueBufferingMaxMs |
|
|
queueBufferingMaxMessages |
|
|
queueEnqueueTimeoutMs |
|
|
batchNumMessages |
|
|
serializerClass |
|
|
keySerializerClass |
|
|
Consumer Options
Property |
Default |
Description |
consumerId |
|
|
socketTimeoutMs |
|
|
socketReceiveBufferBytes |
|
|
fetchMessageMaxBytes |
|
|
autoCommitEnable |
|
|
autoCommitIntervalMs |
|
|
queuedMaxMessages |
|
|
rebalanceMaxRetries |
|
|
fetchMinBytes |
|
|
fetchWaitMaxMs |
|
|
rebalanceBackoffMs |
|
|
refreshLeaderBackoffMs |
|
|
autoOffsetReset |
|
|
consumerTimeoutMs |
|
|
Samples
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
See Also