Repository: camel
Updated Branches:
  refs/heads/master 9c6d648c0 -> d80f93cca


CAMEL-10196: Camel Kafka doesn't support SASL_PLAINTEXT security protocol


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2645cc18
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2645cc18
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2645cc18

Branch: refs/heads/master
Commit: 2645cc184f549da4c2ce398a8ea9704927524b2e
Parents: 9c6d648
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Thu Jul 28 14:06:10 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Thu Jul 28 14:08:46 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc | 13 +++++---
 .../component/kafka/KafkaConfiguration.java     | 33 ++++++++++++++------
 .../component/kafka/KafkaComponentTest.java     |  2 ++
 3 files changed, 34 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2645cc18/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc 
b/components/camel-kafka/src/main/docs/kafka.adoc
index 575c6a2..aee97a8 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -89,8 +89,9 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
 // endpoint options: START
-The Kafka component supports 74 endpoint options which are listed below:
+The Kafka component supports 75 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -100,7 +101,12 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 | bridgeEndpoint | common | false | boolean | If the option is true then 
KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the 
inbound message.
 | clientId | common |  | String | The client id is a user-specified string 
sent in each request to help trace calls. It should logically identify the 
application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group 
of consumer processes to which this consumer belongs. By setting the same group 
id multiple processes indicate that they are all part of the same consumer 
group.
+| kerberosBeforeReloginMinTime | common | 60000 | Integer | Login thread sleep 
time between refresh attempts.
+| kerberosInitCmd | common | /usr/bin/kinit | String | Kerberos kinit command 
path. Default is /usr/bin/kinit
+| kerberosRenewJitter | common | 0.05 | Double | Percentage of random jitter 
added to the renewal time.
+| kerberosRenewWindowFactor | common | 0.8 | Double | Login thread will sleep 
until the specified window factor of time from last refresh to ticket's expiry 
has been reached at which time it will try to renew the ticket.
 | partitioner | common | 
org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The 
partitioner class for partitioning messages amongst sub-topics. The default 
partitioner is based on the hash of the key.
+| saslMechanism | common | GSSAPI | String | The The Simple Authentication and 
Security Layer (SASL) Mechanism used. For the valid values see 
http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml
 | topic | common |  | String | *Required* Name of the topic to use.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit 
to ZooKeeper the offset of messages already fetched by the consumer. This 
committed offset will be used when the process fails as the position from which 
the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that 
the consumer offsets are committed to zookeeper.
@@ -125,10 +131,6 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory 
the producer can use to buffer records waiting to be sent to the server. If 
records are sent faster than they can be delivered to the server the producer 
will either block or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use but is not a hard bound since not all memory the producer 
uses is used for buffering. Some additional memory will be used for compression 
(if compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to 
specify the compression codec for all data generated by this producer. Valid 
values are none gzip and snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections 
after the number of milliseconds specified by this config.
-| kerberosBeforeReloginMinTime | producer | 60000 | Integer | Login thread 
sleep time between refresh attempts.
-| kerberosInitCmd | producer | /usr/bin/kinit | String | Kerberos kinit 
command path. Default is /usr/bin/kinit
-| kerberosRenewJitter | producer | 0.05 | Double | Percentage of random jitter 
added to the renewal time.
-| kerberosRenewWindowFactor | producer | 0.8 | Double | Login thread will 
sleep until the specified window factor of time from last refresh to ticket's 
expiry has been reached at which time it will try to renew the ticket.
 | keySerializerClass | producer |  | String | The serializer class for keys 
(defaults to the same as for messages if nothing is given).
 | lingerMs | producer | 0 | Integer | The producer groups together any records 
that arrive in between request transmissions into a single batched request. 
Normally this occurs only under load when records arrive faster than they can 
be sent out. However in some circumstances the client may want to reduce the 
number of requests even under moderate load. This setting accomplishes this by 
adding a small amount of artificial delaythat is rather than immediately 
sending out a record the producer will wait for up to the given delay to allow 
other records to be sent so that the sends can be batched together. This can be 
thought of as analogous to Nagle's algorithm in TCP. This setting gives the 
upper bound on the delay for batching: once we get batch.size worth of records 
for a partition it will be sent immediately regardless of this setting however 
if we have fewer than this many bytes accumulated for this partition we will 
'linger' for the specified time waiting for more records to show 
 up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5 for 
example would have the effect of reducing the number of requests sent but would 
add up to 5ms of latency to records sent in the absense of load.
 | maxBlockMs | producer | 60000 | Integer | The configuration controls how 
long sending to kafka will block. These methods can be blocked for multiple 
reasons. For e.g: buffer full metadata unavailable.This configuration imposes 
maximum limit on the total time spent in fetching metadata serialization of key 
and value partitioning and allocation of buffer memory when doing a send(). In 
case of partitionsFor() this configuration imposes a maximum time threshold on 
waiting for metadata
@@ -185,6 +187,7 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 
 
 
+
 For more information about Producer/Consumer configuration:
 
 
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/2645cc18/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index c69f32f..6326408 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -213,16 +213,19 @@ public class KafkaConfiguration {
     private Integer reconnectBackoffMs = 50;
     //SASL
     //sasl.kerberos.kinit.cmd
-    @UriParam(label = "producer", defaultValue = 
SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
+    @UriParam(label = "common", defaultValue = 
SaslConfigs.DEFAULT_SASL_MECHANISM)
+    private String saslMechanism = SaslConfigs.DEFAULT_SASL_MECHANISM;
+    //sasl.kerberos.kinit.cmd
+    @UriParam(label = "common", defaultValue = 
SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
     private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
     //sasl.kerberos.min.time.before.relogin
-    @UriParam(label = "producer", defaultValue = "60000")
+    @UriParam(label = "common", defaultValue = "60000")
     private Integer kerberosBeforeReloginMinTime = 60000;
     //sasl.kerberos.ticket.renew.jitter
-    @UriParam(label = "producer", defaultValue = "0.05")
+    @UriParam(label = "common", defaultValue = "0.05")
     private Double kerberosRenewJitter = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER;
     //sasl.kerberos.ticket.renew.window.factor
-    @UriParam(label = "producer", defaultValue = "0.8")
+    @UriParam(label = "common", defaultValue = "0.8")
     private Double kerberosRenewWindowFactor = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
     //SSL
     //ssl.cipher.suites
@@ -267,8 +270,6 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, 
getPartitioner());
         addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, 
getReceiveBufferBytes());
         addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getRequestTimeoutMs());
-        //SASL
-        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
getSaslKerberosServiceName());
         // Security protocol
         addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
         addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, 
getSendBufferBytes());
@@ -286,10 +287,12 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, 
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
         //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
getSaslKerberosServiceName());
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, 
getKerberosInitCmd());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
getKerberosBeforeReloginMinTime());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
getKerberosRenewWindowFactor());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, 
getSaslMechanism());
         //SSL
         addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
getSslCipherSuites());
         addPropertyIfNotNull(props, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
getSslEndpointAlgorithm());
@@ -319,8 +322,6 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, 
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
         addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
getReceiveBufferBytes());
         addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getConsumerRequestTimeoutMs());
-        //SASL
-        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
getSaslKerberosServiceName());
         // Security protocol
         addPropertyIfNotNull(props, 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
         addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, 
getSendBufferBytes());
@@ -341,10 +342,12 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, 
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
         addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 
getRetryBackoffMs());
         //SASL
+        addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
getSaslKerberosServiceName());
         addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, 
getKerberosInitCmd());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
getKerberosBeforeReloginMinTime());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
getKerberosRenewWindowFactor());
+        addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, 
getSaslMechanism());
         //SSL
         addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, 
getSslCipherSuites());
         addPropertyIfNotNull(props, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
getSslEndpointAlgorithm());
@@ -762,7 +765,19 @@ public class KafkaConfiguration {
         this.saslKerberosServiceName = saslKerberosServiceName;
     }
 
-    public String getSecurityProtocol() {
+    public String getSaslMechanism() {
+               return saslMechanism;
+       }
+
+    /**
+     * The Simple Authentication and Security Layer (SASL) Mechanism used. 
+     * For the valid values see <a 
href="http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml";>http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml</a>
+     */
+       public void setSaslMechanism(String saslMechanism) {
+               this.saslMechanism = saslMechanism;
+       }
+
+       public String getSecurityProtocol() {
         return securityProtocol;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2645cc18/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 6a3773a..572d931 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -148,6 +148,7 @@ public class KafkaComponentTest {
         props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "60000");
         props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, "0.05");
         props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, "0.8");
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509");
         props.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX");
 
@@ -184,6 +185,7 @@ public class KafkaComponentTest {
         params.put("sslTruststoreLocation", "/abc");
         params.put("sslTruststorePassword", "testing");
         params.put("saslKerberosServiceName", "test");
+        params.put("saslMechanism", "PLAIN");
         params.put("securityProtocol", "PLAINTEXT");
         params.put("sslEnabledProtocols", "TLSv1.2");
         params.put("sslKeystoreType", "JKS");

Reply via email to