Modified: websites/production/camel/content/idempotent-consumer.html
==============================================================================
--- websites/production/camel/content/idempotent-consumer.html (original)
+++ websites/production/camel/content/idempotent-consumer.html Tue Mar  7 
15:20:59 2017
@@ -86,7 +86,7 @@
        <tbody>
         <tr>
         <td valign="top" width="100%">
-<div class="wiki-content maincontent"><h3 
id="IdempotentConsumer-IdempotentConsumer">Idempotent Consumer</h3><p>The <a 
shape="rect" class="external-link" 
href="http://www.enterpriseintegrationpatterns.com/IdempotentReceiver.html"; 
rel="nofollow">Idempotent Consumer</a> from the <a shape="rect" 
href="enterprise-integration-patterns.html">EIP patterns</a> is used to filter 
out duplicate messages.</p><p>This pattern is implemented using the <a 
shape="rect" class="external-link" 
href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/idempotent/IdempotentConsumer.html";>IdempotentConsumer</a>
 class. This uses an <a shape="rect" href="expression.html">Expression</a> to 
calculate a unique message ID string for a given message exchange; this ID can 
then be looked up in the <a shape="rect" class="external-link" 
href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/spi/IdempotentRepository.html";>IdempotentRepository</a>
 to see if it h
 as been seen before; if it has the message is consumed; if its not then the 
message is processed and the ID is added to the repository.</p><p>The 
Idempotent Consumer essentially acts like a <a shape="rect" 
href="message-filter.html">Message Filter</a> to filter out 
duplicates.</p><p>Camel will add the message id eagerly to the repository to 
detect duplication also for Exchanges currently in progress.<br clear="none"> 
On completion Camel will remove the message id from the repository if the 
Exchange failed, otherwise it stays there.</p><p>Camel provides the following 
Idempotent Consumer implementations:</p><ul 
class="alternate"><li>MemoryIdempotentRepository</li><li><a shape="rect" 
href="file2.html">FileIdempotentRepository</a></li><li><a shape="rect" 
href="hazelcast-component.html">HazelcastIdempotentRepository</a> 
(<strong>Available as of Camel 2.8</strong>)</li><li><a shape="rect" 
href="sql-component.html">JdbcMessageIdRepository</a> (<strong>Available as of 
Camel 2.7</strong>)</l
 i><li><a shape="rect" href="jpa.html">JpaMessageIdRepository</a></li><li><p><a 
shape="rect" href="infinispan.html">InfinispanIdempotentRepository</a> 
(<strong>Available as of Camel 2.13.0)</strong></p></li><li><p><a shape="rect" 
href="jcache.html">JCacheIdempotentRepository</a><strong>&#160;(<strong>Available
 as of Camel 2.17.0)</strong></strong></p></li><li><p><a shape="rect" 
href="spring.html">SpringCacheIdempotentRepository</a>&#160;<strong>(<strong>Available
 as of Camel 2.17.1)</strong></strong><strong><strong><br 
clear="none"></strong></strong></p></li><li><p><strong><strong><a shape="rect" 
href="ehcache.html">EhcacheIdempotentRepository</a>&#160;<strong>(<strong>Available
 as of Camel 2.18.0)</strong></strong><br 
clear="none"></strong></strong></p></li></ul><h3 
id="IdempotentConsumer-Options">Options</h3><p>The Idempotent Consumer has the 
following options:</p><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>O
 ption</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>eager</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Eager controls whether Camel adds the message to the 
repository before or after the exchange has been processed. If enabled before 
then Camel will be able to detect duplicate messages even when messages are 
currently in progress. By disabling Camel will only detect duplicates when a 
message has successfully been processed.</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>messageIdRepositoryRef</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>null</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>A reference to a 
<code>IdempotentRepository</code> to lookup in the registry. This option is man
 datory when using XML DSL.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>skipDuplicate</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.8:</strong> Sets whether to skip 
duplicate messages. If set to <code>false</code> then the message will be 
continued. However the <a shape="rect" href="exchange.html">Exchange</a> has 
been marked as a duplicate by having the <code>Exchange.DUPLICATE_MESSAG</code> 
exchange property set to a <code>Boolean.TRUE</code> 
value.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>removeOnFailure</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.9:</strong> Sets whether to remove the 
id of an Exchange that failed.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">completionEager</td><td colspan="1" rowspan="1" 
class="confl
 uenceTd">false</td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.16:</strong> Sets whether to complete 
the idempotent consumer eager or when the exchange is done.</p><p>If this 
option is true to complete eager, then the idempotent consumer will trigger its 
completion when the exchange reached the end of the block of the idempotent 
consumer pattern. So if the exchange is continued routed after the block ends, 
then whatever happens there does not affect the state.</p><p>If this option is 
false (default) to not complete eager, then the idempotent consumer will 
complete when the exchange is done being routed. So if the exchange is 
continued routed after the block ends, then whatever happens there also affect 
the state. For example if the exchange failed due to an exception, then the 
state of the idempotent consumer will be a 
rollback.</p></td></tr></tbody></table></div><h3 
id="IdempotentConsumer-Usingthe"><strong>Using the <a shape="rect" 
href="fluent-builders.html
 ">Fluent Builders</a></strong></h3><p>The following example will use the 
header <strong>myMessageId</strong> to filter out duplicates</p><div 
class="code panel pdl" style="border-width: 1px;"><div class="codeContent 
panelContent pdl">
+<div class="wiki-content maincontent"><h3 
id="IdempotentConsumer-IdempotentConsumer">Idempotent Consumer</h3><p>The <a 
shape="rect" class="external-link" 
href="http://www.enterpriseintegrationpatterns.com/IdempotentReceiver.html"; 
rel="nofollow">Idempotent Consumer</a> from the <a shape="rect" 
href="enterprise-integration-patterns.html">EIP patterns</a> is used to filter 
out duplicate messages.</p><p>This pattern is implemented using the <a 
shape="rect" class="external-link" 
href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/idempotent/IdempotentConsumer.html";>IdempotentConsumer</a>
 class. This uses an <a shape="rect" href="expression.html">Expression</a> to 
calculate a unique message ID string for a given message exchange; this ID can 
then be looked up in the <a shape="rect" class="external-link" 
href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/spi/IdempotentRepository.html";>IdempotentRepository</a>
 to see if it h
 as been seen before; if it has the message is consumed; if its not then the 
message is processed and the ID is added to the repository.</p><p>The 
Idempotent Consumer essentially acts like a <a shape="rect" 
href="message-filter.html">Message Filter</a> to filter out 
duplicates.</p><p>Camel will add the message id eagerly to the repository to 
detect duplication also for Exchanges currently in progress.<br clear="none"> 
On completion Camel will remove the message id from the repository if the 
Exchange failed, otherwise it stays there.</p><p>Camel provides the following 
Idempotent Consumer implementations:</p><ul 
class="alternate"><li>MemoryIdempotentRepository</li><li><a shape="rect" 
href="file2.html">FileIdempotentRepository</a></li><li><a shape="rect" 
href="hazelcast-component.html">HazelcastIdempotentRepository</a> 
(<strong>Available as of Camel 2.8</strong>)</li><li><a shape="rect" 
href="sql-component.html">JdbcMessageIdRepository</a> (<strong>Available as of 
Camel 2.7</strong>)</l
 i><li><a shape="rect" href="jpa.html">JpaMessageIdRepository</a></li><li><p><a 
shape="rect" href="infinispan.html">InfinispanIdempotentRepository</a> 
(<strong>Available as of Camel 2.13.0)</strong></p></li><li><p><a shape="rect" 
href="jcache.html">JCacheIdempotentRepository</a><strong>&#160;(<strong>Available
 as of Camel 2.17.0)</strong></strong></p></li><li><p><a shape="rect" 
href="spring.html">SpringCacheIdempotentRepository</a>&#160;<strong>(<strong>Available
 as of Camel 2.17.1)</strong></strong><strong><strong><br 
clear="none"></strong></strong></p></li><li><p><a shape="rect" 
href="ehcache.html">EhcacheIdempotentRepository</a><strong><strong>&#160;<strong>(<strong>Available
 as of Camel 2.18.0)</strong></strong></strong></strong></p></li><li><a 
shape="rect" href="kafka.html">KafkaIdempotentRepository</a> (<strong>Available 
as of Camel 2.19.0)</strong></li></ul><h3 
id="IdempotentConsumer-Options">Options</h3><p>The Idempotent Consumer has the 
following options:</p><div class="tabl
 e-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>eager</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Eager controls whether Camel adds the message to the 
repository before or after the exchange has been processed. If enabled before 
then Camel will be able to detect duplicate messages even when messages are 
currently in progress. By disabling Camel will only detect duplicates when a 
message has successfully been processed.</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>messageIdRepositoryRef</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>null</code></p></td><td 
colspan="1" rowspan="1" class="confluence
 Td"><p>A reference to a <code>IdempotentRepository</code> to lookup in the 
registry. This option is mandatory when using XML DSL.</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>skipDuplicate</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.8:</strong> Sets whether to 
skip duplicate messages. If set to <code>false</code> then the message will be 
continued. However the <a shape="rect" href="exchange.html">Exchange</a> has 
been marked as a duplicate by having the <code>Exchange.DUPLICATE_MESSAG</code> 
exchange property set to a <code>Boolean.TRUE</code> 
value.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>removeOnFailure</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.9:</strong> Sets whether to remove the 
id of an Exchange that failed.</p></td></tr><tr><td co
 lspan="1" rowspan="1" class="confluenceTd">completionEager</td><td colspan="1" 
rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.16:</strong> Sets whether to complete 
the idempotent consumer eager or when the exchange is done.</p><p>If this 
option is true to complete eager, then the idempotent consumer will trigger its 
completion when the exchange reached the end of the block of the idempotent 
consumer pattern. So if the exchange is continued routed after the block ends, 
then whatever happens there does not affect the state.</p><p>If this option is 
false (default) to not complete eager, then the idempotent consumer will 
complete when the exchange is done being routed. So if the exchange is 
continued routed after the block ends, then whatever happens there also affect 
the state. For example if the exchange failed due to an exception, then the 
state of the idempotent consumer will be a 
rollback.</p></td></tr></tbody></table>
 </div><h3 id="IdempotentConsumer-Usingthe"><strong>Using the <a shape="rect" 
href="fluent-builders.html">Fluent Builders</a></strong></h3><p>The following 
example will use the header <strong>myMessageId</strong> to filter out 
duplicates</p><div class="code panel pdl" style="border-width: 1px;"><div 
class="codeContent panelContent pdl">
 <script class="brush: java; gutter: false; theme: Default" 
type="syntaxhighlighter"><![CDATA[
 RouteBuilder builder = new RouteBuilder() {
     public void configure() {

Modified: websites/production/camel/content/kafka.html
==============================================================================
--- websites/production/camel/content/kafka.html (original)
+++ websites/production/camel/content/kafka.html Tue Mar  7 15:20:59 2017
@@ -102,13 +102,16 @@
 </div></div><h3 id="Kafka-URIformat">URI format</h3><div class="code panel 
pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
 <script class="brush: java; gutter: false; theme: Default" 
type="syntaxhighlighter"><![CDATA[kafka:server:port[?options]
 ]]></script>
-</div></div><p>&#160;</p><h3 id="Kafka-Options(Camel2.16orolder)">Options 
(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div 
class="table-wrap">
- <table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>zookeeperHost</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>The zookeeper host to use</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperPort</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>2181</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>The zookeeper port to 
use</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">zookeeperConnect</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd"><strong>Camel 2.13.3/2.14.1:</strong> If in use, then 
zookeeperHost/zookeeperPort is no
 t used.</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>topic</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>The topic to use</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>groupId</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>partitioner</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>consumerStreams</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">10</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>clientId</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" c
 lass="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>zookeeperSessionTimeoutMs</p></td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>zookeeperConnectionTimeoutMs</p></td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>zookeeperSyncTimeMs</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">consumersCount</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">1</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.15.0:</strong> The numbe
 r of consumers that connect to kafka server</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">batchSize</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">100</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.15.0: </strong>The batchSize that the 
BatchingConsumerTask processes once, deprecated since <strong>2.17.1</strong>, 
removed<strong><br clear="none"></strong>since 
<strong>2.18.0</strong></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">barrierAwaitTimeoutMs</span></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">10000</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.15.0: </strong>If the 
BatchingConsumerTask processes exchange exceed the batchSize, it will wait for 
<span style="color: rgb(51,51,51);">barr
 ierAwaitTimeoutMs</span>, deprecated since <strong>2.17.1</strong>, removed 
since&#160;<strong>2.18.0</strong><strong>.<br 
clear="none"></strong></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">bridgeEndpoint</td><td colspan="1" rowspan="1" 
class="confluenceTd">false</td><td colspan="1" rowspan="1" 
class="confluenceTd">Camel 2.16.0: If bridgeEndpoint is true, the producer will 
ignore the topic header setting of the message.</td></tr></tbody></table>
-</div></div><p>You can append query options to the URI in the following 
format, <code>?option=value&amp;option=value&amp;...</code></p><h3 
id="Kafka-ProducerOptions(Camel2.16orolder)">Producer Options&#160;(Camel 2.16 
or older)</h3><div class="confluenceTableSmall"><div class="table-wrap">
- <table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>producerType</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sync (Taken from native KafkaProducer 
class)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>sync - send 
message/batch immediately, and wait until response is received</p><p>async - 
queue the message/batch to send. There is a thread per broker (Kafka node) 
which polls from this queue upon <span>queueBufferingMaxMs or 
<span>batchNumMessages</span></span></p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd">compressionCodec</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1"
  class="confluenceTd">compressedTopics</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">messageSendMaxRetries</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">retryBackoffMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">topicMetadataRefreshIntervalMs</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">sendBufferBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="
 1" rowspan="1" class="confluenceTd">requestRequiredAcks</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">requestTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queueBufferingMaxMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queueBufferingMaxMessages</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queueEnqueueTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr
 ><tr><td colspan="1" rowspan="1" class="confluenceTd">batchNumMessages</td><td 
 >colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
 >rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" 
 >rowspan="1" class="confluenceTd">serializerClass</td><td colspan="1" 
 >rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
 >class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
 >class="confluenceTd">keySerializerClass</td><td colspan="1" rowspan="1" 
 >class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
 >class="confluenceTd">&#160;</td></tr></tbody></table>
-</div></div><h3 id="Kafka-ConsumerOptions(Camel2.16orolder)">Consumer 
Options&#160;(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div 
class="table-wrap">
- <table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>consumerId</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">socketTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">socketReceiveBufferBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">fetchMessageMaxBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd">&
 #160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">autoCommitEnable</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">autoCommitIntervalMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queuedMaxMessages</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">rebalanceMaxRetries</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">fetchMinBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd
 ">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">fetchWaitMaxMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">rebalanceBackoffMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">refreshLeaderBackoffMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">autoOffsetReset</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">consumerTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluen
 ceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr></tbody></table>
-</div></div><h3 id="Kafka-Options(Camel2.17ornewer)">Options (Camel 2.17 or 
newer)</h3><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
class="confluenceTh">Default</th><th colspan="1" rowspan="1" 
class="confluenceTh">Description</th></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">topic</span></p></td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">Topic to use. From the 
<strong>consumer</strong> side you can specify also a comma separated list of 
topics.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p 
class="p1"><span class="s1">groupId</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">con
 sumerStreams</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">10</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">clientId</span></p></td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p class="p1"><span 
class="s1">consumersCount</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">1</td><td colspan="1" rowspan="1" class="confluenceTd"><p 
class="p1"><span class="s1">The number of consumers that connect to kafka 
server</span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span 
class="s1">batchSize</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">100</td><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">Commit Size if auto commit 
is false</span></p><
 /td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p 
class="p1"><span class="s1">bridgeEndpoint</span></p></td><td colspan="1" 
rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" 
class="confluenceTd"><span>If the bridgeEndpoint is true, the producer will 
ignore the topic header setting of the 
message.</span></td></tr></tbody></table></div><p>&#160;</p><h3 
id="Kafka-ProducerOptions(Camel2.17ornewer)">Producer Options&#160;(Camel 2.17 
or newer)</h3><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
class="confluenceTh">Default &amp; Description Reference</th></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>serializerClass</p></td><td 
colspan="1" rowspan="46" class="confluenceTd"><p class="p1"><span 
class="s1"><span class="nolink"><a shape="rect" class="external-link" 
href="http://kafka.apache.org/documentation.html#producerconfigs";>http://
 kafka.apache.org/documentation.html#producerconfigs</a></span></span></p><p 
class="p1"><span class="s1"><span class="nolink">serializerClass : <span 
class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></p><p
 class="p1"><span class="s1"><span class="nolink">keySerializerClass : <span 
class="s1"><span class="nolink"><span 
class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></span></span></p><p>partitioner
 : <span 
class="pl-s">org.apache.kafka.clients.producer.internals.DefaultPartitioner<span
 class="pl-pds">&#160;</span></span></p><p>&#160;</p><p>&#160;</p><p 
class="p1"><span class="s1"><span class="nolink"><br 
clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>keySerializerClass</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>requestRequiredAcks&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>bufferMemorySize&#160;</p></
 td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>compressionCodec&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>retries&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystorePassword</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>producerBatchSize&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>lingerMs&#160;</p></td></tr><tr><td colspan="1" 
 rowspan="1" class="confluenceTd"><p>maxBlockMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>maxRequestSize&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>partitioner&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>requestTimeoutMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>saslMechanism <strong>(from 
Camel 2.18)</strong></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreType&#160;</p></td
 ></tr><tr><td colspan="1" rowspan="1" 
 >class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td colspan="1" 
 >rowspan="1" class="confluenceTd"><p>sslProvider</p></td></tr><tr><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td colspan="1" 
 >rowspan="1" 
 >class="confluenceTd"><p>maxInFlightRequest&#160;</p></td></tr><tr><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p>metricReporters</p></td></tr><tr><td colspan="1" 
 >rowspan="1" 
 >class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" 
 >rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr>
 <tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslCipherSuites</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><h3
 id="Kafka-ConsumerOptions(Camel2.17ornewer)">Consumer Options&#160;(Camel 2.17 
or newer)</h3><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
class="confluenceTh">Default &amp; Description Re
 ference</th></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="45" 
class="confluenceTd"><p class="p1"><span class="s1"><span class="nolink"><a 
shape="rect" class="external-link" 
href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>http://kafka.apache.org/documentation.html#newconsumerconfigs</a></span></span></p><p
 class="p1"><span class="s1"><span class="nolink">keyDeserializer : <span 
class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></p><p
 class="p1"><span class="s1"><span class="nolink">valueDeserializer : <span 
class="s1"><span class="nolink"><span 
class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></span></span></p><p>partitionAssignor
 : <span 
class="pl-s">org.apache.kafka.clients.consumer.RangeAssignor</span></p><p>&#160;</p><p
 class="p1"><span class="s1"><span class="nolink"><span class="s1"><span 
class="nolink"><span class="pl-s"><
 br clear="none"></span></span></span></span></span></p><p class="p1"><span 
class="s1"><span class="nolink"><br 
clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>keyDeserializer</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>valueDeserializer</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>fetchMinBytes&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>groupId</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>heartbeatIntervalMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>maxPartitionFetchBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sessionTimeoutMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluence
 Td"><p>sslKeystorePassword</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>autoOffsetReset&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>autoCommitEnable&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>partitionAssignor&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>consumerRequestTimeoutMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p><span>saslMechanism 
</span><strong>(from Camel 2.18)</strong>
 </p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystoreType&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslProvider</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>autoCommitIntervalMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>checkCrcs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>fetchWaitMaxMs&#160;</p></td></tr><tr><td 
colspan=
 "1" rowspan="1" 
class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>metricReporters</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>sslCipherSuites
 </p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><p>&#160;</p><h3
 id="Kafka-Samples">Samples</h3><h4 id="Kafka-Camel2.16orolder.1">Camel 2.16 or 
older</h4><p>Consuming messages:</p><div class="code panel pdl" 
style="border-width: 1px;"><div class="codeContent panelContent pdl">
+</div></div><p>&#160;</p><h3 id="Kafka-Options(Camel2.16orolder)">Options 
(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div 
class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" 
rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" 
rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>zookeeperHost</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>The zookeeper host to use</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperPort</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>2181</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>The zookeeper port to 
use</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">zookeeperConnect</td><td colspan="1" rowspan="1" 
class="conf
 luenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd"><strong>Camel 2.13.3/2.14.1:</strong> If in use, then 
zookeeperHost/zookeeperPort is not used.</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>topic</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>The topic to use</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>groupId</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>partitioner</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>consumerStreams</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">10</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><
 td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>zookeeperSessionTimeoutMs</p></td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>zookeeperConnectionTimeoutMs</p></td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>zookeeperSyncTimeMs</p></td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">consumersCount</span></p></td><td colspan="1" rowspan="1" 
class="confl
 uenceTd"><p><span style="color: rgb(51,51,51);">1</span></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.15.0:</strong> 
The number of consumers that connect to kafka server</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">batchSize</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">100</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.15.0: </strong>The batchSize that the 
BatchingConsumerTask processes once, deprecated since <strong>2.17.1</strong>, 
removed<strong><br clear="none"></strong>since 
<strong>2.18.0</strong></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">barrierAwaitTimeoutMs</span></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><span style="color: 
rgb(51,51,51);">10000</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>
 <strong>Camel 2.15.0: </strong>If the BatchingConsumerTask processes exchange 
exceed the batchSize, it will wait for <span style="color: 
rgb(51,51,51);">barrierAwaitTimeoutMs</span>, deprecated since 
<strong>2.17.1</strong>, removed since&#160;<strong>2.18.0</strong><strong>.<br 
clear="none"></strong></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">bridgeEndpoint</td><td colspan="1" rowspan="1" 
class="confluenceTd">false</td><td colspan="1" rowspan="1" 
class="confluenceTd">Camel 2.16.0: If bridgeEndpoint is true, the producer will 
ignore the topic header setting of the 
message.</td></tr></tbody></table></div></div>
+
+
+<p>You can append query options to the URI in the following format, 
<code>?option=value&amp;option=value&amp;...</code></p><h3 
id="Kafka-ProducerOptions(Camel2.16orolder)">Producer Options&#160;(Camel 2.16 
or older)</h3><div class="confluenceTableSmall"><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>producerType</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sync (Taken from native KafkaProducer 
class)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>sync - send 
message/batch immediately, and wait until response is received</p><p>async - 
queue the message/batch to send. There is a thread per broker (Kafka node) 
which polls from this queue upon <span>queueBufferingMaxMs or <sp
 an>batchNumMessages</span></span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">compressionCodec</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">compressedTopics</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">messageSendMaxRetries</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">retryBackoffMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">topicMetadataRefreshIntervalMs</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td
  colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd">sendBufferBytes</td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd">requestRequiredAcks</td><td colspan="1" 
rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">requestTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queueBufferingMaxMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queueBufferingMaxMessages</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#
 160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queueEnqueueTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">batchNumMessages</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">serializerClass</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">keySerializerClass</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr></tbody></table></div></div>
+
+
+<h3 id="Kafka-ConsumerOptions(Camel2.16orolder)">Consumer Options&#160;(Camel 
2.16 or older)</h3><div class="confluenceTableSmall"><div 
class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" 
rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" 
rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>consumerId</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">socketTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">socketReceiveBufferBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="conf
 luenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">fetchMessageMaxBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">autoCommitEnable</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">autoCommitIntervalMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">queuedMaxMessages</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">rebalanceMaxRetries</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" c
 lass="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">fetchMinBytes</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">fetchWaitMaxMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">rebalanceBackoffMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">refreshLeaderBackoffMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">autoOffsetReset</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" c
 lass="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">consumerTimeoutMs</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr></tbody></table></div></div>
+
+
+<h3 id="Kafka-Options(Camel2.17ornewer)">Options (Camel 2.17 or 
newer)</h3><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
class="confluenceTh">Default</th><th colspan="1" rowspan="1" 
class="confluenceTh">Description</th></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">topic</span></p></td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" 
rowspan="1" class="confluenceTd">Topic to use. From the 
<strong>consumer</strong> side you can specify also a comma separated list of 
topics.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p 
class="p1"><span class="s1">groupId</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">consumerStreams
 </span></p></td><td colspan="1" rowspan="1" class="confluenceTd">10</td><td 
colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span 
class="s1">clientId</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" 
class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span 
class="s1">consumersCount</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">1</td><td colspan="1" rowspan="1" class="confluenceTd"><p 
class="p1"><span class="s1">The number of consumers that connect to kafka 
server</span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span 
class="s1">batchSize</span></p></td><td colspan="1" rowspan="1" 
class="confluenceTd">100</td><td colspan="1" rowspan="1" 
class="confluenceTd"><p class="p1"><span class="s1">Commit Size if auto commit 
is false</span></p></td></tr><tr
 ><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span 
 >class="s1">bridgeEndpoint</span></p></td><td colspan="1" rowspan="1" 
 >class="confluenceTd">false</td><td colspan="1" rowspan="1" 
 >class="confluenceTd"><span>If the bridgeEndpoint is true, the producer will 
 >ignore the topic header setting of the 
 >message.</span></td></tr></tbody></table></div><p>&#160;</p><h3 
 >id="Kafka-ProducerOptions(Camel2.17ornewer)">Producer Options&#160;(Camel 
 >2.17 or newer)</h3><div class="table-wrap"><table 
 >class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
 >class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
 >class="confluenceTh">Default &amp; Description Reference</th></tr><tr><td 
 >colspan="1" rowspan="1" class="confluenceTd"><p>serializerClass</p></td><td 
 >colspan="1" rowspan="46" class="confluenceTd"><p class="p1"><span 
 >class="s1"><span class="nolink"><a shape="rect" class="external-link" 
 >href="http://kafka.apache.org/documentation.html#producerconfigs";>http://kafka.apache
 .org/documentation.html#producerconfigs</a></span></span></p><p 
class="p1"><span class="s1"><span class="nolink">serializerClass : <span 
class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></p><p
 class="p1"><span class="s1"><span class="nolink">keySerializerClass : <span 
class="s1"><span class="nolink"><span 
class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></span></span></p><p>partitioner
 : <span 
class="pl-s">org.apache.kafka.clients.producer.internals.DefaultPartitioner<span
 class="pl-pds">&#160;</span></span></p><p>&#160;</p><p>&#160;</p><p 
class="p1"><span class="s1"><span class="nolink"><br 
clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>keySerializerClass</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>requestRequiredAcks&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>bufferMemorySize&#160;</p></td></tr><tr>
 <td colspan="1" rowspan="1" 
class="confluenceTd"><p>compressionCodec&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>retries&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystorePassword</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>producerBatchSize&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>lingerMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
 class="confluenceTd"><p>maxBlockMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>maxRequestSize&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>partitioner&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>requestTimeoutMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>saslMechanism <strong>(from 
Camel 2.18)</strong></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystoreType&#160;</p></td></tr><tr><t
 d colspan="1" rowspan="1" 
class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslProvider</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>maxInFlightRequest&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>metricReporters</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr><tr><td 
cols
 pan="1" rowspan="1" 
class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslCipherSuites</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><h3
 id="Kafka-ConsumerOptions(Camel2.17ornewer)">Consumer Options&#160;(Camel 2.17 
or newer)</h3><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
class="confluenceTh">Default &amp; Description Reference</th>
 </tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="45" 
class="confluenceTd"><p class="p1"><span class="s1"><span class="nolink"><a 
shape="rect" class="external-link" 
href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>http://kafka.apache.org/documentation.html#newconsumerconfigs</a></span></span></p><p
 class="p1"><span class="s1"><span class="nolink">keyDeserializer : <span 
class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></p><p
 class="p1"><span class="s1"><span class="nolink">valueDeserializer : <span 
class="s1"><span class="nolink"><span 
class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></span></span></p><p>partitionAssignor
 : <span 
class="pl-s">org.apache.kafka.clients.consumer.RangeAssignor</span></p><p>&#160;</p><p
 class="p1"><span class="s1"><span class="nolink"><span class="s1"><span 
class="nolink"><span class="pl-s"><br clear="no
 ne"></span></span></span></span></span></p><p class="p1"><span 
class="s1"><span class="nolink"><br 
clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>keyDeserializer</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>valueDeserializer</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>fetchMinBytes&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>groupId</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>heartbeatIntervalMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>maxPartitionFetchBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sessionTimeoutMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslKe
 ystorePassword</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>autoOffsetReset&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>autoCommitEnable&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>partitionAssignor&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>consumerRequestTimeoutMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p><span>saslMechanism 
</span><strong>(from Camel 2.18)</strong></p></td></t
 r><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslKeystoreType&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslProvider</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>autoCommitIntervalMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>checkCrcs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>fetchWaitMaxMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan=
 "1" class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>metricReporters</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>sslCipherSuites</p></td></t
 r><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><p>&#160;</p><h3
 id="Kafka-Samples">Samples</h3><h4 id="Kafka-Camel2.16orolder.1">Camel 2.16 or 
older</h4><p>Consuming messages:</p><div class="code panel pdl" 
style="border-width: 1px;"><div class="codeContent panelContent pdl">
 <script class="brush: java; gutter: false; theme: Default" 
type="syntaxhighlighter"><![CDATA[from(&quot;kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&quot;).to(&quot;log:input&quot;);
 ]]></script>
 </div></div><p>Producing messages:</p><p>See unit tests of camel-kafka for 
more examples</p><h4 id="Kafka-Camel2.17ornewer.1">Camel 2.17 or 
newer</h4><p>Consuming messages:</p><div class="code panel pdl" 
style="border-width: 1px;"><div class="codeContent panelContent pdl">
@@ -149,6 +152,42 @@
                                        }
                                
}).to(&quot;kafka:localhost:9092?topic=test&quot;);
 ]]></script>
+</div></div><h3 
id="Kafka-UsingtheKafkaidempotentrepository(AvailablefromCamel2.19)">Using the 
Kafka idempotent repository (Available from Camel 2.19)</h3><p>The 
<code>camel-kafka</code> library provides a Kafka topic-based idempotent 
repository. This repository stores broadcasts all changes to idempotent state 
(add/remove) in a Kafka topic, and populates a local in-memory cache for each 
repository's process instance through event sourcing.</p><p>The topic used must 
be unique per idempotent repository instance. The mechanism does not have any 
requirements about the number of topic partitions; as the repository consumes 
from all partitions at the same time. It also does not have any requirements 
about the replication factor of the topic.</p><p>Each repository instance that 
uses the topic (e.g. typically on different machines running in parallel) 
controls its own consumer group, so in a cluster of 10 Camel processes using 
the same topic each will control its own offset.</p><p>On start
 up, the instance subscribes to the topic and rewinds the offset to the 
beginning, rebuilding the cache to the latest state. The cache will not be 
considered warmed up until one poll of <code>pollDurationMs</code> in length 
returns 0 records. Startup will not be completed until either the cache has 
warmed up, or 30 seconds go by; if the latter happens the idempotent repository 
may be in an inconsistent state until its consumer catches up to the end of the 
topic.</p><p>A <code>KafkaIdempotentRepository</code> has the following 
properties:</p><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh">Property</th><th colspan="1" rowspan="1" 
class="confluenceTh">Description</th></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><pre><span style="color: 
rgb(102,14,122);">topic</span></pre></td><td colspan="1" rowspan="1" 
class="confluenceTd">The name of the Kafka topic to use to broadcast changes. 
(required)</td></tr><tr><td 
 colspan="1" rowspan="1" 
class="confluenceTd"><code>bootstrapServers</code></td><td colspan="1" 
rowspan="1" class="confluenceTd">The <code>bootstrap.servers</code> property on 
the internal Kafka producer and consumer. Use this as shorthand if not setting 
<code>consumerConfig</code> and <code>producerConfig</code>. If used, this 
component will apply sensible default configurations for the producer and 
consumer.</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><code>producerConfig</code></td><td colspan="1" 
rowspan="1" class="confluenceTd">Sets the properties that will be used by the 
Kafka producer that broadcasts changes. Overrides 
<code>bootstrapServers</code>, so must define the Kafka 
<code>bootstrap.servers</code> property itself</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><code>consumerConfig</code></td><td 
colspan="1" rowspan="1" class="confluenceTd">Sets the properties that will be 
used by the Kafka consumer that populates the cache from the topic.
  Overrides <code>bootstrapServers</code>, so must define the Kafka 
<code>bootstrap.servers</code> property itself</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><code>maxCacheSize</code></td><td colspan="1" 
rowspan="1" class="confluenceTd">How many of the most recently used keys should 
be stored in memory (default 1000).</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><pre><span style="color: 
rgb(0,0,0);">pollDurationMs</span></pre></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>The poll duration of the Kafka consumer. The local 
caches are updated immediately; this value will affect how far behind other 
peers in the cluster are, which are updating their caches from the topic, 
relative to the idempotent consumer instance issued the cache action 
message.</p><p>The default value of this is 100 ms. If setting this value 
explicitly, be aware that there is a tradeoff between the remote cache liveness 
and the volume of network traffic between this repo
 sitory's consumer and the Kafka 
brokers.</p></td></tr></tbody></table></div><p>The repository can be 
instantiated by defining the topic and <code>bootstrapServers</code>, or the 
<code>producerConfig</code> and <code>consumerConfig</code> property sets can 
be explicitly defined to enable features such as SSL/SASL.</p><p>To use, this 
repository must be placed in the Camel registry, either manually or by 
registration as a bean in Spring/Blueprint, as it is <code>CamelContext</code> 
aware.</p><p>Sample usage is as follows:</p><div class="code panel pdl" 
style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default" 
type="syntaxhighlighter"><![CDATA[KafkaIdempotentRepository 
kafkaIdempotentRepository = new 
KafkaIdempotentRepository(&quot;idempotent-db-inserts&quot;, 
&quot;localhost:9091&quot;);
+
+SimpleRegistry registry = new SimpleRegistry();
+registry.put(&quot;insertDbIdemRepo&quot;, kafkaIdempotentRepository); // must 
be registered in the registry, to enable access to the CamelContext
+CamelContext context = new CamelContext(registry);
+
+
+// later in RouteBuilder...
+from(&quot;direct:performInsert&quot;)
+    
.idempotentConsumer(header(&quot;id&quot;)).messageIdRepositoryRef(&quot;insertDbIdemRepo&quot;)
+        // once-only insert into database
+    .end()]]></script>
+</div></div><p>In XML:</p><div class="code panel pdl" style="border-width: 
1px;"><div class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default" 
type="syntaxhighlighter"><![CDATA[&lt;!-- simple --&gt;
+&lt;bean id=&quot;insertDbIdemRepo&quot; 
class=&quot;org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository&quot;&gt;
+  &lt;property name=&quot;topic&quot; 
value=&quot;idempotent-db-inserts&quot;/&gt;
+  &lt;property name=&quot;bootstrapServers&quot; 
value=&quot;localhost:9091&quot;/&gt;
+&lt;/bean&gt;
+
+&lt;!-- complex --&gt;
+&lt;bean id=&quot;insertDbIdemRepo&quot; 
class=&quot;org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository&quot;&gt;
+  &lt;property name=&quot;topic&quot; 
value=&quot;idempotent-db-inserts&quot;/&gt;
+  &lt;property name=&quot;maxCacheSize&quot; value=&quot;10000&quot;/&gt;
+  &lt;property name=&quot;consumerConfig&quot;&gt;
+    &lt;props&gt;
+      &lt;prop key=&quot;bootstrap.servers&quot;&gt;localhost:9091&lt;/prop&gt;
+    &lt;/props&gt;
+  &lt;/property&gt;
+  &lt;property name=&quot;producerConfig&quot;&gt;
+    &lt;props&gt;
+      &lt;prop key=&quot;bootstrap.servers&quot;&gt;localhost:9091&lt;/prop&gt;
+    &lt;/props&gt;
+  &lt;/property&gt;
+&lt;/bean&gt;
+]]></script>
 </div></div><p>&#160;</p><p></p><h2 id="Kafka-Endpoints">Endpoints</h2>
 
 <p></p><p>Camel supports the <a shape="rect" 
href="message-endpoint.html">Message Endpoint</a> pattern using the <a 
shape="rect" class="external-link" 
href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/Endpoint.html";>Endpoint</a>
 interface. Endpoints are usually created by a <a shape="rect" 
href="component.html">Component</a> and Endpoints are usually referred to in 
the <a shape="rect" href="dsl.html">DSL</a> via their <a shape="rect" 
href="uris.html">URIs</a>. </p>


Reply via email to