Repository: camel Updated Branches: refs/heads/master 9c6d648c0 -> d80f93cca
CAMEL-10196: Camel Kafka doesn't support SASL_PLAINTEXT security protocol Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2645cc18 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2645cc18 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2645cc18 Branch: refs/heads/master Commit: 2645cc184f549da4c2ce398a8ea9704927524b2e Parents: 9c6d648 Author: Andrea Cosentino <anco...@gmail.com> Authored: Thu Jul 28 14:06:10 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Thu Jul 28 14:08:46 2016 +0200 ---------------------------------------------------------------------- components/camel-kafka/src/main/docs/kafka.adoc | 13 +++++--- .../component/kafka/KafkaConfiguration.java | 33 ++++++++++++++------ .../component/kafka/KafkaComponentTest.java | 2 ++ 3 files changed, 34 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2645cc18/components/camel-kafka/src/main/docs/kafka.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc index 575c6a2..aee97a8 100644 --- a/components/camel-kafka/src/main/docs/kafka.adoc +++ b/components/camel-kafka/src/main/docs/kafka.adoc @@ -89,8 +89,9 @@ The Kafka component supports 1 options which are listed below. + // endpoint options: START -The Kafka component supports 74 endpoint options which are listed below: +The Kafka component supports 75 endpoint options which are listed below: {% raw %} [width="100%",cols="2s,1,1m,1m,5",options="header"] @@ -100,7 +101,12 @@ The Kafka component supports 74 endpoint options which are listed below: | bridgeEndpoint | common | false | boolean | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. | clientId | common | | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request. | groupId | common | | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. +| kerberosBeforeReloginMinTime | common | 60000 | Integer | Login thread sleep time between refresh attempts. +| kerberosInitCmd | common | /usr/bin/kinit | String | Kerberos kinit command path. Default is /usr/bin/kinit +| kerberosRenewJitter | common | 0.05 | Double | Percentage of random jitter added to the renewal time. +| kerberosRenewWindowFactor | common | 0.8 | Double | Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached at which time it will try to renew the ticket. | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key. +| saslMechanism | common | GSSAPI | String | The The Simple Authentication and Security Layer (SASL) Mechanism used. For the valid values see http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml | topic | common | | String | *Required* Name of the topic to use. | autoCommitEnable | consumer | true | Boolean | If true periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that the consumer offsets are committed to zookeeper. @@ -125,10 +131,6 @@ The Kafka component supports 74 endpoint options which are listed below: | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | compressionCodec | producer | none | String | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number of milliseconds specified by this config. -| kerberosBeforeReloginMinTime | producer | 60000 | Integer | Login thread sleep time between refresh attempts. -| kerberosInitCmd | producer | /usr/bin/kinit | String | Kerberos kinit command path. Default is /usr/bin/kinit -| kerberosRenewJitter | producer | 0.05 | Double | Percentage of random jitter added to the renewal time. -| kerberosRenewWindowFactor | producer | 0.8 | Double | Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached at which time it will try to renew the ticket. | keySerializerClass | producer | | String | The serializer class for keys (defaults to the same as for messages if nothing is given). | lingerMs | producer | 0 | Integer | The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delaythat is rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5 for example would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load. | maxBlockMs | producer | 60000 | Integer | The configuration controls how long sending to kafka will block. These methods can be blocked for multiple reasons. For e.g: buffer full metadata unavailable.This configuration imposes maximum limit on the total time spent in fetching metadata serialization of key and value partitioning and allocation of buffer memory when doing a send(). In case of partitionsFor() this configuration imposes a maximum time threshold on waiting for metadata @@ -185,6 +187,7 @@ The Kafka component supports 74 endpoint options which are listed below: + For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://git-wip-us.apache.org/repos/asf/camel/blob/2645cc18/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index c69f32f..6326408 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -213,16 +213,19 @@ public class KafkaConfiguration { private Integer reconnectBackoffMs = 50; //SASL //sasl.kerberos.kinit.cmd - @UriParam(label = "producer", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD) + @UriParam(label = "common", defaultValue = SaslConfigs.DEFAULT_SASL_MECHANISM) + private String saslMechanism = SaslConfigs.DEFAULT_SASL_MECHANISM; + //sasl.kerberos.kinit.cmd + @UriParam(label = "common", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD) private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD; //sasl.kerberos.min.time.before.relogin - @UriParam(label = "producer", defaultValue = "60000") + @UriParam(label = "common", defaultValue = "60000") private Integer kerberosBeforeReloginMinTime = 60000; //sasl.kerberos.ticket.renew.jitter - @UriParam(label = "producer", defaultValue = "0.05") + @UriParam(label = "common", defaultValue = "0.05") private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER; //sasl.kerberos.ticket.renew.window.factor - @UriParam(label = "producer", defaultValue = "0.8") + @UriParam(label = "common", defaultValue = "0.8") private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR; //SSL //ssl.cipher.suites @@ -267,8 +270,6 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner()); addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes()); addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs()); - //SASL - addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName()); // Security protocol addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol()); addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes()); @@ -286,10 +287,12 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); //SASL + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor()); + addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, getSaslMechanism()); //SSL addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites()); addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm()); @@ -319,8 +322,6 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor()); addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes()); addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs()); - //SASL - addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName()); // Security protocol addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol()); addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes()); @@ -341,10 +342,12 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); //SASL + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter()); addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor()); + addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, getSaslMechanism()); //SSL addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites()); addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm()); @@ -762,7 +765,19 @@ public class KafkaConfiguration { this.saslKerberosServiceName = saslKerberosServiceName; } - public String getSecurityProtocol() { + public String getSaslMechanism() { + return saslMechanism; + } + + /** + * The Simple Authentication and Security Layer (SASL) Mechanism used. + * For the valid values see <a href="http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml">http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml</a> + */ + public void setSaslMechanism(String saslMechanism) { + this.saslMechanism = saslMechanism; + } + + public String getSecurityProtocol() { return securityProtocol; } http://git-wip-us.apache.org/repos/asf/camel/blob/2645cc18/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index 6a3773a..572d931 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -148,6 +148,7 @@ public class KafkaComponentTest { props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "60000"); props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, "0.05"); props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, "0.8"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509"); props.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX"); @@ -184,6 +185,7 @@ public class KafkaComponentTest { params.put("sslTruststoreLocation", "/abc"); params.put("sslTruststorePassword", "testing"); params.put("saslKerberosServiceName", "test"); + params.put("saslMechanism", "PLAIN"); params.put("securityProtocol", "PLAINTEXT"); params.put("sslEnabledProtocols", "TLSv1.2"); params.put("sslKeystoreType", "JKS");