CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2aa831d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2aa831d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2aa831d Branch: refs/heads/master Commit: b2aa831da8c8f78f7d6ca908c5b33957bbc7fa24 Parents: 038e161 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Mar 9 09:58:36 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 9 10:01:59 2016 +0100 ---------------------------------------------------------------------- components/camel-kafka/pom.xml | 26 +- .../camel/component/kafka/KafkaComponent.java | 22 +- .../component/kafka/KafkaConfiguration.java | 1204 ++++++++++++------ .../camel/component/kafka/KafkaConstants.java | 13 +- .../camel/component/kafka/KafkaConsumer.java | 183 +-- .../camel/component/kafka/KafkaEndpoint.java | 598 +++++---- .../camel/component/kafka/KafkaProducer.java | 64 +- .../component/kafka/BaseEmbeddedKafkaTest.java | 21 +- .../component/kafka/KafkaComponentTest.java | 224 ++-- .../kafka/KafkaConsumerBatchSizeTest.java | 64 +- .../component/kafka/KafkaConsumerFullTest.java | 34 +- .../component/kafka/KafkaConsumerTest.java | 8 +- .../component/kafka/KafkaEndpointTest.java | 13 +- .../component/kafka/KafkaProducerFullTest.java | 145 +-- .../component/kafka/KafkaProducerTest.java | 65 +- .../component/kafka/SimplePartitioner.java | 39 - .../kafka/embedded/EmbeddedKafkaCluster.java | 12 +- .../src/test/resources/log4j.properties | 2 +- 18 files changed, 1618 insertions(+), 1119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml index 1a5f98f..f8bbdb5 100644 --- a/components/camel-kafka/pom.xml +++ b/components/camel-kafka/pom.xml @@ -15,7 +15,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -37,31 +38,35 @@ </properties> <dependencies> + + <!-- camel --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> </dependency> + + <!-- kafka java client --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka-version}</version> + </dependency> + + <!-- kafka server for testing using scala --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka-version}</version> + <scope>test</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> </exclusion> - <exclusion> - <artifactId>scala-library</artifactId> - <groupId>org.scala-lang</groupId> - </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <scope>provided</scope> - </dependency> + <!-- test --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> @@ -77,6 +82,7 @@ <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index c9d4c2a..2981b3f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -20,8 +20,6 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.impl.UriEndpointComponent; -import org.apache.camel.util.CamelContextHelper; -import org.apache.camel.util.EndpointHelper; public class KafkaComponent extends UriEndpointComponent { @@ -34,30 +32,14 @@ public class KafkaComponent extends UriEndpointComponent { } @Override - protected KafkaEndpoint createEndpoint(String uri, - String remaining, - Map<String, Object> params) throws Exception { - + protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception { KafkaEndpoint endpoint = new KafkaEndpoint(uri, this); String brokers = remaining.split("\\?")[0]; - Object confparam = params.get("configuration"); - if (confparam != null) { - // need a special handling to resolve the reference before other parameters are set/merged into the config - KafkaConfiguration confobj = null; - if (confparam instanceof KafkaConfiguration) { - confobj = (KafkaConfiguration)confparam; - } else if (confparam instanceof String && EndpointHelper.isReferenceParameter((String)confparam)) { - confobj = (KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), ((String)confparam).substring(1)); - } - if (confobj != null) { - endpoint.setConfiguration(confobj.copy()); - } - params.remove("configuration"); - } if (brokers != null) { endpoint.getConfiguration().setBrokers(brokers); } setProperties(endpoint, params); return endpoint; } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 894df0c..4a948c1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -18,28 +18,28 @@ package org.apache.camel.component.kafka; import java.util.Properties; -import kafka.producer.DefaultPartitioner; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; @UriParams -public class KafkaConfiguration implements Cloneable { +public class KafkaConfiguration { + + @UriPath @Metadata(required = "true") + private String brokers; - @UriParam - private String zookeeperConnect; - @UriParam - private String zookeeperHost; - @UriParam(defaultValue = "2181") - private int zookeeperPort = 2181; @UriParam @Metadata(required = "true") private String topic; @UriParam private String groupId; - @UriParam(defaultValue = "DefaultPartitioner") - private String partitioner = DefaultPartitioner.class.getCanonicalName(); + @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER) + private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER; @UriParam(label = "consumer", defaultValue = "10") private int consumerStreams = 10; @UriParam(label = "consumer", defaultValue = "1") @@ -53,134 +53,302 @@ public class KafkaConfiguration implements Cloneable { @UriParam private String clientId; + //key.deserializer + @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER) + private String keyDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER; + //value.deserializer + @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER) + private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER; + //fetch.min.bytes + @UriParam(label = "consumer", defaultValue = "1024") + private Integer fetchMinBytes = 1024; + //heartbeat.interval.ms + @UriParam(label = "consumer", defaultValue = "3000") + private Integer heartbeatIntervalMs = 3000; + //max.partition.fetch.bytes + @UriParam(label = "consumer", defaultValue = "1048576") + private Integer maxPartitionFetchBytes = 1048576; + //session.timeout.ms + @UriParam(label = "consumer", defaultValue = "30000") + private Integer sessionTimeoutMs = 30000; + //auto.offset.reset + @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none") + private String autoOffsetReset = "latest"; + //partition.assignment.strategy + @UriParam(label = "consumer", defaultValue = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR) + private String partitionAssignor = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR; + //request.timeout.ms + @UriParam(label = "consumer", defaultValue = "40000") + private Integer consumerRequestTimeoutMs = 40000; + //auto.commit.interval.ms + @UriParam(label = "consumer", defaultValue = "5000") + private Integer autoCommitIntervalMs = 5000; + //check.crcs + @UriParam(label = "consumer", defaultValue = "true") + private Boolean checkCrcs = true; + //fetch.max.wait.ms + @UriParam(label = "consumer", defaultValue = "500") + private Integer fetchWaitMaxMs = 500; + //Consumer configuration properties @UriParam(label = "consumer") private String consumerId; - @UriParam(label = "consumer", defaultValue = "30000") - private Integer socketTimeoutMs = 30 * 1000; - @UriParam(label = "consumer", defaultValue = "" + 64 * 1024) - private Integer socketReceiveBufferBytes = 64 * 1024; - @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024) - private Integer fetchMessageMaxBytes = 1024 * 1024; @UriParam(label = "consumer", defaultValue = "true") private Boolean autoCommitEnable = true; - @UriParam(label = "consumer", defaultValue = "60000") - private Integer autoCommitIntervalMs = 60 * 1000; - @UriParam(label = "consumer", defaultValue = "2") - private Integer queuedMaxMessageChunks = 2; - @UriParam(label = "consumer", defaultValue = "4") - private Integer rebalanceMaxRetries = 4; - @UriParam(label = "consumer", defaultValue = "1") - private Integer fetchMinBytes = 1; - @UriParam(label = "consumer", defaultValue = "100") - private Integer fetchWaitMaxMs = 100; - @UriParam(label = "consumer", defaultValue = "2000") - private Integer rebalanceBackoffMs = 2000; - @UriParam(label = "consumer", defaultValue = "200") - private Integer refreshLeaderBackoffMs = 200; - @UriParam(label = "consumer", defaultValue = "largest", enums = "smallest,largest,fail") - private String autoOffsetReset = "largest"; - @UriParam(label = "consumer") - private Integer consumerTimeoutMs; - @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka") - private String offsetsStorage = "zookeeper"; - @UriParam(label = "consumer", defaultValue = "true") - private Boolean dualCommitEnabled = true; - - //Zookeepr configuration properties - @UriParam - private Integer zookeeperSessionTimeoutMs; - @UriParam - private Integer zookeeperConnectionTimeoutMs; - @UriParam - private Integer zookeeperSyncTimeMs; //Producer configuration properties - @UriPath - private String brokers; - @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync") - private String producerType = "sync"; - @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy") - private String compressionCodec = "none"; - @UriParam(label = "producer") - private String compressedTopics; - @UriParam(label = "producer", defaultValue = "3") - private Integer messageSendMaxRetries = 3; @UriParam(label = "producer", defaultValue = "100") private Integer retryBackoffMs = 100; - @UriParam(label = "producer", defaultValue = "600000") - private Integer topicMetadataRefreshIntervalMs = 600 * 1000; - - //Sync producer config - @UriParam(label = "producer", defaultValue = "" + 100 * 1024) - private Integer sendBufferBytes = 100 * 1024; - @UriParam(label = "producer", defaultValue = "0") - private short requestRequiredAcks; - @UriParam(label = "producer", defaultValue = "10000") - private Integer requestTimeoutMs = 10000; //Async producer config - @UriParam(label = "producer", defaultValue = "5000") - private Integer queueBufferingMaxMs = 5000; @UriParam(label = "producer", defaultValue = "10000") private Integer queueBufferingMaxMessages = 10000; @UriParam(label = "producer") - private Integer queueEnqueueTimeoutMs; - @UriParam(label = "producer", defaultValue = "200") - private Integer batchNumMessages = 200; - @UriParam(label = "producer") private String serializerClass; @UriParam(label = "producer") private String keySerializerClass; + @UriParam(label = "producer", defaultValue = "1") + private Integer requestRequiredAcks = 1; + //buffer.memory + @UriParam(label = "producer", defaultValue = "33554432") + private Integer bufferMemorySize = 33554432; + //compression.type + @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4") + private String compressionCodec = "none"; + //retries + @UriParam(label = "producer", defaultValue = "0") + private Integer retries = 0; + // SSL + // ssl.key.password + @UriParam(label = "producer") + private String sslKeyPassword; + // ssl.keystore.location + @UriParam(label = "producer") + private String sslKeystoreLocation; + // ssl.keystore.password + @UriParam(label = "producer") + private String sslKeystorePassword; + //ssl.truststore.location + @UriParam(label = "producer") + private String sslTruststoreLocation; + //ssl.truststore.password + @UriParam(label = "producer") + private String sslTruststorePassword; + //batch.size + @UriParam(label = "producer", defaultValue = "16384") + private Integer producerBatchSize = 16384; + //connections.max.idle.ms + @UriParam(label = "producer", defaultValue = "540000") + private Integer connectionMaxIdleMs = 540000; + //linger.ms + @UriParam(label = "producer", defaultValue = "0") + private Integer lingerMs = 0; + //linger.ms + @UriParam(label = "producer", defaultValue = "60000") + private Integer maxBlockMs = 60000; + //max.request.size + @UriParam(label = "producer", defaultValue = "1048576") + private Integer maxRequestSize = 1048576; + //receive.buffer.bytes + @UriParam(label = "producer", defaultValue = "32768") + private Integer receiveBufferBytes = 32768; + //request.timeout.ms + @UriParam(label = "producer", defaultValue = "30000") + private Integer requestTimeoutMs = 30000; + // SASL & sucurity Protocol + //sasl.kerberos.service.name + @UriParam(label = "producer") + private String saslKerberosServiceName; + //security.protocol + @UriParam(label = "producer", defaultValue = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL) + private String securityProtocol = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; + //send.buffer.bytes + @UriParam(label = "producer", defaultValue = "131072") + private Integer sendBufferBytes = 131072; + //SSL + //ssl.enabled.protocols + @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS) + private String sslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS; + //ssl.keystore.type + @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE) + private String sslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE; + //ssl.protocol + @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_PROTOCOL) + private String sslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL; + //ssl.provider + @UriParam(label = "producer") + private String sslProvider; + //ssl.truststore.type + @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE) + private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE; + //timeout.ms + @UriParam(label = "producer", defaultValue = "30000") + private Integer timeoutMs = 30000; + //block.on.buffer.full + @UriParam(label = "producer", defaultValue = "false") + private Boolean blockOnBufferFull = false; + //max.in.flight.requests.per.connection + @UriParam(label = "producer", defaultValue = "5") + private Integer maxInFlightRequest = 5; + //metadata.fetch.timeout.ms + @UriParam(label = "producer", defaultValue = "60000") + private Integer metadataFetchTimeoutMs = 600 * 1000; + //metadata.max.age.ms + @UriParam(label = "producer", defaultValue = "300000") + private Integer metadataMaxAgeMs = 300000; + //metric.reporters + @UriParam(label = "producer") + private String metricReporters; + //metrics.num.samples + @UriParam(label = "producer", defaultValue = "2") + private Integer noOfMetricsSample = 2; + //metrics.sample.window.ms + @UriParam(label = "producer", defaultValue = "30000") + private Integer metricsSampleWindowMs = 30000; + //reconnect.backoff.ms + @UriParam(label = "producer", defaultValue = "50") + private Integer reconnectBackoffMs = 50; + //SASL + //sasl.kerberos.kinit.cmd + @UriParam(label = "producer", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD) + private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD; + //sasl.kerberos.min.time.before.relogin + @UriParam(label = "producer", defaultValue = "60000") + private Integer kerberosBeforeReloginMinTime = 60000; + //sasl.kerberos.ticket.renew.jitter + @UriParam(label = "producer", defaultValue = "0.05") + private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER; + //sasl.kerberos.ticket.renew.window.factor + @UriParam(label = "producer", defaultValue = "0.8") + private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR; + //SSL + //ssl.cipher.suites + @UriParam(label = "producer") + private String sslCipherSuites; + //ssl.endpoint.identification.algorithm + @UriParam(label = "producer") + private String sslEndpointAlgorithm; + //ssl.keymanager.algorithm + @UriParam(label = "producer", defaultValue = "SunX509") + private String sslKeymanagerAlgorithm = "SunX509"; + //ssl.trustmanager.algorithm + @UriParam(label = "producer", defaultValue = "PKIX") + private String sslTrustmanagerAlgorithm = "PKIX"; + public KafkaConfiguration() { } public Properties createProducerProperties() { Properties props = new Properties(); - addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks()); - addPropertyIfNotNull(props, "partitioner.class", getPartitioner()); - addPropertyIfNotNull(props, "serializer.class", getSerializerClass()); - addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass()); - addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs()); - addPropertyIfNotNull(props, "producer.type", getProducerType()); - addPropertyIfNotNull(props, "compression.codec", getCompressionCodec()); - addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics()); - addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries()); - addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs()); - addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs()); - addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs()); - addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages()); - addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs()); - addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages()); - addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes()); - addPropertyIfNotNull(props, "client.id", getClientId()); + addPropertyIfNotNull(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializerClass()); + addPropertyIfNotNull(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializerClass()); + addPropertyIfNotNull(props, ProducerConfig.ACKS_CONFIG, getRequestRequiredAcks()); + addPropertyIfNotNull(props, ProducerConfig.BUFFER_MEMORY_CONFIG, getBufferMemorySize()); + addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec()); + addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries()); + // SSL + addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword()); + addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getRetries()); + addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize()); + addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId()); + addPropertyIfNotNull(props, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs()); + addPropertyIfNotNull(props, ProducerConfig.LINGER_MS_CONFIG, getLingerMs()); + addPropertyIfNotNull(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, getMaxBlockMs()); + addPropertyIfNotNull(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getMaxRequestSize()); + addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner()); + addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes()); + addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs()); + //SASL + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName()); + // Security protocol + addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol()); + addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes()); + //SSL + addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType()); + addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol()); + addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType()); + addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs()); + addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull()); + addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest()); + addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs()); + addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs()); + addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters()); + addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample()); + addPropertyIfNotNull(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs()); + addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); + addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); + //SASL + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd()); + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime()); + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter()); + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor()); + //SSL + addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites()); + addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm()); + return props; } public Properties createConsumerProperties() { Properties props = new Properties(); - addPropertyIfNotNull(props, "consumer.id", getConsumerId()); - addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs()); - addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes()); - addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes()); - addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable()); - addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs()); - addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages()); - addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes()); - addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs()); - addPropertyIfNotNull(props, "queued.max.message.chunks", getQueuedMaxMessageChunks()); - addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries()); - addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs()); - addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs()); - addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset()); - addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs()); - addPropertyIfNotNull(props, "client.id", getClientId()); - addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs()); - addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs()); - addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs()); - addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage()); - addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled()); + addPropertyIfNotNull(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer()); + addPropertyIfNotNull(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer()); + addPropertyIfNotNull(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes()); + addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs()); + addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes()); + addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs()); + // SSL + addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword()); + addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset()); + addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs()); + addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable()); + addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor()); + addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes()); + addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs()); + //SASL + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName()); + // Security protocol + addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol()); + addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes()); + //SSL + addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType()); + addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol()); + addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType()); + addPropertyIfNotNull(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs()); + addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs()); + addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId()); + addPropertyIfNotNull(props, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchWaitMaxMs()); + addPropertyIfNotNull(props, ConsumerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs()); + addPropertyIfNotNull(props, ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters()); + addPropertyIfNotNull(props, ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample()); + addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs()); + addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs()); + addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs()); + //SASL + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd()); + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime()); + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter()); + addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor()); + //SSL + addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites()); + addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm()); + addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm()); + addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm()); return props; } @@ -191,65 +359,6 @@ public class KafkaConfiguration implements Cloneable { } } - public String getZookeeperConnect() { - if (this.zookeeperConnect != null) { - return zookeeperConnect; - } else { - return getZookeeperHost() + ":" + getZookeeperPort(); - } - } - - /** - * Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. - * To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the - * form hostname1:port1,hostname2:port2,hostname3:port3. - * The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data - * under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. - * For example to give a chroot path of /chroot/path you would give the connection - * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path. - */ - public void setZookeeperConnect(String zookeeperConnect) { - this.zookeeperConnect = zookeeperConnect; - - // connect overrides host and port - this.zookeeperHost = null; - this.zookeeperPort = -1; - } - - public String getZookeeperHost() { - return zookeeperHost; - } - - /** - * The zookeeper host to use. - * <p/> - * To connect to multiple zookeeper hosts use the zookeeperConnect option instead. - * <p/> - * This option can only be used if zookeeperConnect is not in use. - */ - public void setZookeeperHost(String zookeeperHost) { - if (this.zookeeperConnect == null) { - this.zookeeperHost = zookeeperHost; - } - } - - public int getZookeeperPort() { - return zookeeperPort; - } - - /** - * The zookeeper port to use - * <p/> - * To connect to multiple zookeeper hosts use the zookeeperConnect option instead. - * <p/> - * This option can only be used if zookeeperConnect is not in use. - */ - public void setZookeeperPort(int zookeeperPort) { - if (this.zookeeperConnect == null) { - this.zookeeperPort = zookeeperPort; - } - } - public String getGroupId() { return groupId; } @@ -278,7 +387,7 @@ public class KafkaConfiguration implements Cloneable { } /** - * Name of the topic to use + * Name of the topic to use. */ public void setTopic(String topic) { this.topic = topic; @@ -351,440 +460,741 @@ public class KafkaConfiguration implements Cloneable { this.consumerId = consumerId; } - public Integer getSocketTimeoutMs() { - return socketTimeoutMs; + public Boolean isAutoCommitEnable() { + return autoCommitEnable; } /** - * The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. + * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. + * This committed offset will be used when the process fails as the position from which the new consumer will begin. */ - public void setSocketTimeoutMs(Integer socketTimeoutMs) { - this.socketTimeoutMs = socketTimeoutMs; + public void setAutoCommitEnable(Boolean autoCommitEnable) { + this.autoCommitEnable = autoCommitEnable; } - public Integer getSocketReceiveBufferBytes() { - return socketReceiveBufferBytes; + public Integer getAutoCommitIntervalMs() { + return autoCommitIntervalMs; } /** - * The socket receive buffer for network requests + * The frequency in ms that the consumer offsets are committed to zookeeper. */ - public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) { - this.socketReceiveBufferBytes = socketReceiveBufferBytes; + public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) { + this.autoCommitIntervalMs = autoCommitIntervalMs; } - public Integer getFetchMessageMaxBytes() { - return fetchMessageMaxBytes; + public Integer getFetchMinBytes() { + return fetchMinBytes; } /** - * The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. - * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. - * The fetch request size must be at least as large as the maximum message size the server allows or else it - * is possible for the producer to send messages larger than the consumer can fetch. + * The minimum amount of data the server should return for a fetch request. + * If insufficient data is available the request will wait for that much data to accumulate before answering the request. */ - public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) { - this.fetchMessageMaxBytes = fetchMessageMaxBytes; + public void setFetchMinBytes(Integer fetchMinBytes) { + this.fetchMinBytes = fetchMinBytes; } - public Boolean isAutoCommitEnable() { - return autoCommitEnable; + public Integer getFetchWaitMaxMs() { + return fetchWaitMaxMs; } /** - * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. - * This committed offset will be used when the process fails as the position from which the new consumer will begin. + * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */ - public void setAutoCommitEnable(Boolean autoCommitEnable) { - this.autoCommitEnable = autoCommitEnable; + public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) { + this.fetchWaitMaxMs = fetchWaitMaxMs; } - public Integer getAutoCommitIntervalMs() { - return autoCommitIntervalMs; + public String getAutoOffsetReset() { + return autoOffsetReset; } /** - * The frequency in ms that the consumer offsets are committed to zookeeper. + * What to do when there is no initial offset in ZooKeeper or if an offset is out of range: + * smallest : automatically reset the offset to the smallest offset + * largest : automatically reset the offset to the largest offset + * fail: throw exception to the consumer */ - public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) { - this.autoCommitIntervalMs = autoCommitIntervalMs; + public void setAutoOffsetReset(String autoOffsetReset) { + this.autoOffsetReset = autoOffsetReset; } - public Integer getQueuedMaxMessageChunks() { - return queuedMaxMessageChunks; + public String getBrokers() { + return brokers; } /** - * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes. + * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). + * The socket connections for sending the actual data will be established based on the broker information returned in the metadata. + * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. + * <p/> + * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. */ - public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) { - this.queuedMaxMessageChunks = queuedMaxMessageChunks; + public void setBrokers(String brokers) { + this.brokers = brokers; } - public Integer getRebalanceMaxRetries() { - return rebalanceMaxRetries; + public String getCompressionCodec() { + return compressionCodec; } /** - * When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. - * If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. - * This setting controls the maximum number of attempts before giving up. + * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". */ - public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) { - this.rebalanceMaxRetries = rebalanceMaxRetries; + public void setCompressionCodec(String compressionCodec) { + this.compressionCodec = compressionCodec; } - public Integer getFetchMinBytes() { - return fetchMinBytes; + public Integer getRetryBackoffMs() { + return retryBackoffMs; } /** - * The minimum amount of data the server should return for a fetch request. - * If insufficient data is available the request will wait for that much data to accumulate before answering the request. + * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. + * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. */ - public void setFetchMinBytes(Integer fetchMinBytes) { - this.fetchMinBytes = fetchMinBytes; + public void setRetryBackoffMs(Integer retryBackoffMs) { + this.retryBackoffMs = retryBackoffMs; } - public Integer getFetchWaitMaxMs() { - return fetchWaitMaxMs; + public Integer getSendBufferBytes() { + return sendBufferBytes; } /** - * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes + * Socket write buffer size */ - public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) { - this.fetchWaitMaxMs = fetchWaitMaxMs; + public void setSendBufferBytes(Integer sendBufferBytes) { + this.sendBufferBytes = sendBufferBytes; } - public Integer getRebalanceBackoffMs() { - return rebalanceBackoffMs; + public Integer getRequestTimeoutMs() { + return requestTimeoutMs; } /** - * Backoff time between retries during rebalance. + * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client. */ - public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) { - this.rebalanceBackoffMs = rebalanceBackoffMs; + public void setRequestTimeoutMs(Integer requestTimeoutMs) { + this.requestTimeoutMs = requestTimeoutMs; } - public Integer getRefreshLeaderBackoffMs() { - return refreshLeaderBackoffMs; + public Integer getQueueBufferingMaxMessages() { + return queueBufferingMaxMessages; } /** - * Backoff time to wait before trying to determine the leader of a partition that has just lost its leader. + * The maximum number of unsent messages that can be queued up the producer when using async + * mode before either the producer must be blocked or data must be dropped. */ - public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) { - this.refreshLeaderBackoffMs = refreshLeaderBackoffMs; + public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) { + this.queueBufferingMaxMessages = queueBufferingMaxMessages; } - public String getAutoOffsetReset() { - return autoOffsetReset; + public String getSerializerClass() { + if (serializerClass == null) { + return KafkaConstants.KAFKA_DEFAULT_SERIALIZER; + } + return serializerClass; } /** - * What to do when there is no initial offset in ZooKeeper or if an offset is out of range: - * smallest : automatically reset the offset to the smallest offset - * largest : automatically reset the offset to the largest offset - * fail: throw exception to the consumer + * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]. + * The default class is kafka.serializer.DefaultEncoder */ - public void setAutoOffsetReset(String autoOffsetReset) { - this.autoOffsetReset = autoOffsetReset; + public void setSerializerClass(String serializerClass) { + this.serializerClass = serializerClass; } - public Integer getConsumerTimeoutMs() { - return consumerTimeoutMs; + public String getKeySerializerClass() { + if (keySerializerClass == null) { + return KafkaConstants.KAFKA_DEFAULT_SERIALIZER; + } + return keySerializerClass; } /** - * Throw a timeout exception to the consumer if no message is available for consumption after the specified interval + * The serializer class for keys (defaults to the same as for messages if nothing is given). */ - public void setConsumerTimeoutMs(Integer consumerTimeoutMs) { - this.consumerTimeoutMs = consumerTimeoutMs; + public void setKeySerializerClass(String keySerializerClass) { + this.keySerializerClass = keySerializerClass; } - public Integer getZookeeperSessionTimeoutMs() { - return zookeeperSessionTimeoutMs; + public String getKerberosInitCmd() { + return kerberosInitCmd; } /** - * ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. + * Kerberos kinit command path. Default is /usr/bin/kinit */ - public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) { - this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs; + public void setKerberosInitCmd(String kerberosInitCmd) { + this.kerberosInitCmd = kerberosInitCmd; } - public Integer getZookeeperConnectionTimeoutMs() { - return zookeeperConnectionTimeoutMs; + public Integer getKerberosBeforeReloginMinTime() { + return kerberosBeforeReloginMinTime; } /** - * The max time that the client waits while establishing a connection to zookeeper. + * Login thread sleep time between refresh attempts. */ - public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) { - this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs; + public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) { + this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime; } - public Integer getZookeeperSyncTimeMs() { - return zookeeperSyncTimeMs; + public Double getKerberosRenewJitter() { + return kerberosRenewJitter; } /** - * How far a ZK follower can be behind a ZK leader + * Percentage of random jitter added to the renewal time. */ - public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) { - this.zookeeperSyncTimeMs = zookeeperSyncTimeMs; + public void setKerberosRenewJitter(Double kerberosRenewJitter) { + this.kerberosRenewJitter = kerberosRenewJitter; } - public String getBrokers() { - return brokers; + public Double getKerberosRenewWindowFactor() { + return kerberosRenewWindowFactor; } /** - * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). - * The socket connections for sending the actual data will be established based on the broker information returned in the metadata. - * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. - * <p/> - * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation. + * Login thread will sleep until the specified window factor of time from last + * refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. */ - public void setBrokers(String brokers) { - this.brokers = brokers; + public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) { + this.kerberosRenewWindowFactor = kerberosRenewWindowFactor; } - public String getProducerType() { - return producerType; + public String getSslCipherSuites() { + return sslCipherSuites; } /** - * This parameter specifies whether the messages are sent asynchronously in a background thread. - * Valid values are (1) async for asynchronous send and (2) sync for synchronous send. - * By setting the producer to async we allow batching together of requests (which is great for throughput) - * but open the possibility of a failure of the client machine dropping unsent data. + * A list of cipher suites. This is a named combination of authentication, encryption, + * MAC and key exchange algorithm used to negotiate the security settings for a network connection + * using TLS or SSL network protocol.By default all the available cipher suites are supported. */ - public void setProducerType(String producerType) { - this.producerType = producerType; + public void setSslCipherSuites(String sslCipherSuites) { + this.sslCipherSuites = sslCipherSuites; } - public String getCompressionCodec() { - return compressionCodec; + public String getSslEndpointAlgorithm() { + return sslEndpointAlgorithm; } /** - * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". + * The endpoint identification algorithm to validate server hostname using server certificate. */ - public void setCompressionCodec(String compressionCodec) { - this.compressionCodec = compressionCodec; + public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) { + this.sslEndpointAlgorithm = sslEndpointAlgorithm; } - public String getCompressedTopics() { - return compressedTopics; + public String getSslKeymanagerAlgorithm() { + return sslKeymanagerAlgorithm; } /** - * This parameter allows you to set whether compression should be turned on for particular topics. - * If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. - * If the list of compressed topics is empty, then enable the specified compression codec for all topics. - * If the compression codec is NoCompressionCodec, compression is disabled for all topics + * The algorithm used by key manager factory for SSL connections. Default value is the key + * manager factory algorithm configured for the Java Virtual Machine. */ - public void setCompressedTopics(String compressedTopics) { - this.compressedTopics = compressedTopics; + public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) { + this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm; } - public Integer getMessageSendMaxRetries() { - return messageSendMaxRetries; + public String getSslTrustmanagerAlgorithm() { + return sslTrustmanagerAlgorithm; } /** - * This property will cause the producer to automatically retry a failed send request. - * This property specifies the number of retries when such failures occur. Note that setting a non-zero value here - * can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost. + * The algorithm used by trust manager factory for SSL connections. Default value is the + * trust manager factory algorithm configured for the Java Virtual Machine. */ - public void setMessageSendMaxRetries(Integer messageSendMaxRetries) { - this.messageSendMaxRetries = messageSendMaxRetries; + public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) { + this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm; } - public Integer getRetryBackoffMs() { - return retryBackoffMs; + public String getSslEnabledProtocols() { + return sslEnabledProtocols; } /** - * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. - * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. + * The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default. */ - public void setRetryBackoffMs(Integer retryBackoffMs) { - this.retryBackoffMs = retryBackoffMs; + public void setSslEnabledProtocols(String sslEnabledProtocols) { + this.sslEnabledProtocols = sslEnabledProtocols; } - public Integer getTopicMetadataRefreshIntervalMs() { - return topicMetadataRefreshIntervalMs; + public String getSslKeystoreType() { + return sslKeystoreType; } /** - * The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, - * leader not available...). It will also poll regularly (default: every 10min so 600000ms). - * If you set this to a negative value, metadata will only get refreshed on failure. - * If you set this to zero, the metadata will get refreshed after each message sent (not recommended). - * Important note: the refresh happen only AFTER the message is sent, so if the producer never - * sends a message the metadata is never refreshed + * The file format of the key store file. This is optional for client. Default value is JKS */ - public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) { - this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs; + public void setSslKeystoreType(String sslKeystoreType) { + this.sslKeystoreType = sslKeystoreType; } - public Integer getSendBufferBytes() { - return sendBufferBytes; + public String getSslProtocol() { + return sslProtocol; } /** - * Socket write buffer size + * The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. + * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, + * but their usage is discouraged due to known security vulnerabilities. */ - public void setSendBufferBytes(Integer sendBufferBytes) { - this.sendBufferBytes = sendBufferBytes; + public void setSslProtocol(String sslProtocol) { + this.sslProtocol = sslProtocol; + } + + public String getSslProvider() { + return sslProvider; + } + + /** + * The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. + */ + public void setSslProvider(String sslProvider) { + this.sslProvider = sslProvider; + } + + public String getSslTruststoreType() { + return sslTruststoreType; + } + + /** + * The file format of the trust store file. Default value is JKS. + */ + public void setSslTruststoreType(String sslTruststoreType) { + this.sslTruststoreType = sslTruststoreType; + } + + public String getSaslKerberosServiceName() { + return saslKerberosServiceName; + } + + /** + * The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS + * config or in Kafka's config. + */ + public void setSaslKerberosServiceName(String saslKerberosServiceName) { + this.saslKerberosServiceName = saslKerberosServiceName; + } + + public String getSecurityProtocol() { + return securityProtocol; + } + + /** + * Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported. + */ + public void setSecurityProtocol(String securityProtocol) { + this.securityProtocol = securityProtocol; + } + + public String getSslKeyPassword() { + return sslKeyPassword; + } + + /** + * The password of the private key in the key store file. This is optional for client. + */ + public void setSslKeyPassword(String sslKeyPassword) { + this.sslKeyPassword = sslKeyPassword; + } + + public String getSslKeystoreLocation() { + return sslKeystoreLocation; } - public short getRequestRequiredAcks() { + /** + * The location of the key store file. This is optional for client and can be used for two-way + * authentication for client. + */ + public void setSslKeystoreLocation(String sslKeystoreLocation) { + this.sslKeystoreLocation = sslKeystoreLocation; + } + + public String getSslKeystorePassword() { + return sslKeystorePassword; + } + + /** + * The store password for the key store file.This is optional for client and only needed + * if ssl.keystore.location is configured. + */ + public void setSslKeystorePassword(String sslKeystorePassword) { + this.sslKeystorePassword = sslKeystorePassword; + } + + public String getSslTruststoreLocation() { + return sslTruststoreLocation; + } + + /** + * The location of the trust store file. + */ + public void setSslTruststoreLocation(String sslTruststoreLocation) { + this.sslTruststoreLocation = sslTruststoreLocation; + } + + public String getSslTruststorePassword() { + return sslTruststorePassword; + } + + + /** + * The password for the trust store file. + */ + public void setSslTruststorePassword(String sslTruststorePassword) { + this.sslTruststorePassword = sslTruststorePassword; + } + + public Integer getBufferMemorySize() { + return bufferMemorySize; + } + + /** + * The total bytes of memory the producer can use to buffer records waiting to be sent to the server. + * If records are sent faster than they can be delivered to the server the producer will either block + * or throw an exception based on the preference specified by block.on.buffer.full.This setting should + * correspond roughly to the total memory the producer will use, but is not a hard bound since not all + * memory the producer uses is used for buffering. Some additional memory will be used for compression + * (if compression is enabled) as well as for maintaining in-flight requests. + */ + public void setBufferMemorySize(Integer bufferMemorySize) { + this.bufferMemorySize = bufferMemorySize; + } + + public Boolean getBlockOnBufferFull() { + return blockOnBufferFull; + } + + /** + * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. + * By default this setting is true and we block, however in some scenarios blocking is not desirable and it + * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw + * a BufferExhaustedException if a recrord is sent and the buffer space is full. + */ + public void setBlockOnBufferFull(Boolean blockOnBufferFull) { + this.blockOnBufferFull = blockOnBufferFull; + } + + public Integer getRequestRequiredAcks() { return requestRequiredAcks; } /** - * This value controls when a produce request is considered completed. Specifically, - * how many other brokers must have committed the data to their log and acknowledged this to the leader? - * Typical values are (0, 1 or -1): - * 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). - * This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). - * 1, which means that the producer gets an acknowledgement after the leader replica has received the data. - * This option provides better durability as the client waits until the server acknowledges the request as successful - * (only messages that were written to the now-dead leader but not yet replicated will be lost). - * -1, The producer gets an acknowledgement after all in-sync replicas have received the data. - * This option provides the greatest level of durability. - * However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, - * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas - * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting. - * Please read the Replication section of the design documentation for a more in-depth discussion. + * The number of acknowledgments the producer requires the leader to have received before considering a request complete. + * This controls the durability of records that are sent. The following settings are common: + * acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. + * The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server + * has received the record in this case, and the retries configuration will not take effect (as the client won't generally + * know of any failures). The offset given back for each record will always be set to -1. + * acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement + * from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have + * replicated it then the record will be lost. + * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the + * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. */ - public void setRequestRequiredAcks(short requestRequiredAcks) { + public void setRequestRequiredAcks(Integer requestRequiredAcks) { this.requestRequiredAcks = requestRequiredAcks; } - public Integer getRequestTimeoutMs() { - return requestTimeoutMs; + public Integer getRetries() { + return retries; } /** - * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client. + * Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. + * Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially + * change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second + * succeeds, then the second record may appear first. */ - public void setRequestTimeoutMs(Integer requestTimeoutMs) { - this.requestTimeoutMs = requestTimeoutMs; + public void setRetries(Integer retries) { + this.retries = retries; } - public Integer getQueueBufferingMaxMs() { - return queueBufferingMaxMs; + public Integer getProducerBatchSize() { + return producerBatchSize; } /** - * Maximum time to buffer data when using async mode. - * For example a setting of 100 will try to batch together 100ms of messages to send at once. - * This will improve throughput but adds message delivery latency due to the buffering. + * The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. + * This helps performance on both the client and the server. This configuration controls the default batch size in bytes. + * No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each + * partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero + * will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the + * specified batch size in anticipation of additional records. */ - public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) { - this.queueBufferingMaxMs = queueBufferingMaxMs; + public void setProducerBatchSize(Integer producerBatchSize) { + this.producerBatchSize = producerBatchSize; } - public Integer getQueueBufferingMaxMessages() { - return queueBufferingMaxMessages; + public Integer getConnectionMaxIdleMs() { + return connectionMaxIdleMs; } /** - * The maximum number of unsent messages that can be queued up the producer when using async - * mode before either the producer must be blocked or data must be dropped. + * Close idle connections after the number of milliseconds specified by this config. */ - public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) { - this.queueBufferingMaxMessages = queueBufferingMaxMessages; + public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) { + this.connectionMaxIdleMs = connectionMaxIdleMs; } - public Integer getQueueEnqueueTimeoutMs() { - return queueEnqueueTimeoutMs; + public Integer getLingerMs() { + return lingerMs; } /** - * The amount of time to block before dropping messages when running in async mode and the buffer has reached - * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full - * (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. + * The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this + * occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce + * the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delayâthat is, + * rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that + * the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on + * the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, + * however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more + * records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the + * number of requests sent but would add up to 5ms of latency to records sent in the absense of load. */ - public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) { - this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs; + public void setLingerMs(Integer lingerMs) { + this.lingerMs = lingerMs; } - public Integer getBatchNumMessages() { - return batchNumMessages; + public Integer getMaxBlockMs() { + return maxBlockMs; } /** - * The number of messages to send in one batch when using async mode. - * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. + * The configuration controls how long sending to kafka will block. These methods can be + * blocked for multiple reasons. For e.g: buffer full, metadata unavailable.This configuration imposes maximum limit on the total time spent + * in fetching metadata, serialization of key and value, partitioning and allocation of buffer memory when doing a send(). In case of + * partitionsFor(), this configuration imposes a maximum time threshold on waiting for metadata */ - public void setBatchNumMessages(Integer batchNumMessages) { - this.batchNumMessages = batchNumMessages; + public void setMaxBlockMs(Integer maxBlockMs) { + this.maxBlockMs = maxBlockMs; } - public String getSerializerClass() { - return serializerClass; + public Integer getMaxRequestSize() { + return maxRequestSize; } /** - * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]. - * The default class is kafka.serializer.DefaultEncoder + * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size + * which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid + * sending huge requests. */ - public void setSerializerClass(String serializerClass) { - this.serializerClass = serializerClass; + public void setMaxRequestSize(Integer maxRequestSize) { + this.maxRequestSize = maxRequestSize; } - public String getKeySerializerClass() { - return keySerializerClass; + public Integer getReceiveBufferBytes() { + return receiveBufferBytes; } /** - * The serializer class for keys (defaults to the same as for messages if nothing is given). + * The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. */ - public void setKeySerializerClass(String keySerializerClass) { - this.keySerializerClass = keySerializerClass; + public void setReceiveBufferBytes(Integer receiveBufferBytes) { + this.receiveBufferBytes = receiveBufferBytes; } - public String getOffsetsStorage() { - return offsetsStorage; + public Integer getTimeoutMs() { + return timeoutMs; } /** - * Select where offsets should be stored (zookeeper or kafka). + * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the + * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments + * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include + * the network latency of the request. */ - public void setOffsetsStorage(String offsetsStorage) { - this.offsetsStorage = offsetsStorage; + public void setTimeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; } - public Boolean isDualCommitEnabled() { - return dualCommitEnabled; + public Integer getMaxInFlightRequest() { + return maxInFlightRequest; } /** - * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). - * This is required during migration from zookeeper-based offset storage to kafka-based offset storage. - * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated - * to the new version that commits offsets to the broker (instead of directly to ZooKeeper). + * The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting + * is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled). */ - public void setDualCommitEnabled(Boolean dualCommitEnabled) { - this.dualCommitEnabled = dualCommitEnabled; + public void setMaxInFlightRequest(Integer maxInFlightRequest) { + this.maxInFlightRequest = maxInFlightRequest; + } + + public Integer getMetadataFetchTimeoutMs() { + return metadataFetchTimeoutMs; } /** - * Returns a copy of this configuration + * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. + * This fetch to succeed before throwing an exception back to the client. */ - public KafkaConfiguration copy() { - try { - return (KafkaConfiguration)clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeCamelException(e); - } + public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) { + this.metadataFetchTimeoutMs = metadataFetchTimeoutMs; + } + + public Integer getMetadataMaxAgeMs() { + return metadataMaxAgeMs; + } + + /** + * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership + * changes to proactively discover any new brokers or partitions. + */ + public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) { + this.metadataMaxAgeMs = metadataMaxAgeMs; + } + + public String getMetricReporters() { + return metricReporters; + } + + /** + * A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be + * notified of new metric creation. The JmxReporter is always included to register JMX statistics. + */ + public void setMetricReporters(String metricReporters) { + this.metricReporters = metricReporters; + } + + public Integer getNoOfMetricsSample() { + return noOfMetricsSample; + } + + /** + * The number of samples maintained to compute metrics. + */ + public void setNoOfMetricsSample(Integer noOfMetricsSample) { + this.noOfMetricsSample = noOfMetricsSample; + } + + public Integer getMetricsSampleWindowMs() { + return metricsSampleWindowMs; + } + + /** + * The number of samples maintained to compute metrics. + */ + public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) { + this.metricsSampleWindowMs = metricsSampleWindowMs; + } + + public Integer getReconnectBackoffMs() { + return reconnectBackoffMs; + } + + /** + * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host + * in a tight loop. This backoff applies to all requests sent by the consumer to the broker. + */ + public void setReconnectBackoffMs(Integer reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + } + + public Integer getHeartbeatIntervalMs() { + return heartbeatIntervalMs; + } + + /** + * The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. + * Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new + * consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set + * no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. + */ + public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) { + this.heartbeatIntervalMs = heartbeatIntervalMs; + } + + public Integer getMaxPartitionFetchBytes() { + return maxPartitionFetchBytes; + } + + /** + * The maximum amount of data per-partition the server will return. The maximum total memory used for + * a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the + * maximum message size the server allows or else it is possible for the producer to send messages larger + * than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message + * on a certain partition. + */ + public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) { + this.maxPartitionFetchBytes = maxPartitionFetchBytes; + } + + public Integer getSessionTimeoutMs() { + return sessionTimeoutMs; + } + + /** + * The timeout used to detect failures when using Kafka's group management facilities. + */ + public void setSessionTimeoutMs(Integer sessionTimeoutMs) { + this.sessionTimeoutMs = sessionTimeoutMs; + } + + public String getPartitionAssignor() { + return partitionAssignor; + } + + /** + * The class name of the partition assignment strategy that the client will use to distribute + * partition ownership amongst consumer instances when group management is used + */ + public void setPartitionAssignor(String partitionAssignor) { + this.partitionAssignor = partitionAssignor; + } + + public Integer getConsumerRequestTimeoutMs() { + return consumerRequestTimeoutMs; + } + + /** + * The configuration controls the maximum amount of time the client will wait for the response + * of a request. If the response is not received before the timeout elapses the client will resend + * the request if necessary or fail the request if retries are exhausted. + */ + public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) { + this.consumerRequestTimeoutMs = consumerRequestTimeoutMs; + } + + public Boolean getCheckCrcs() { + return checkCrcs; + } + + /** + * Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk + * corruption to the messages occurred. This check adds some overhead, so it may be disabled in + * cases seeking extreme performance. + */ + public void setCheckCrcs(Boolean checkCrcs) { + this.checkCrcs = checkCrcs; + } + + public String getKeyDeserializer() { + return keyDeserializer; + } + + /** + * Deserializer class for key that implements the Deserializer interface. + */ + public void setKeyDeserializer(String keyDeserializer) { + this.keyDeserializer = keyDeserializer; + } + + public String getValueDeserializer() { + return valueDeserializer; + } + + /** + * Deserializer class for value that implements the Deserializer interface. + */ + public void setValueDeserializer(String valueDeserializer) { + this.valueDeserializer = valueDeserializer; } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index d3ff482..db99a09 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -16,21 +16,20 @@ */ package org.apache.camel.component.kafka; -/** - * - */ public final class KafkaConstants { - public static final String DEFAULT_GROUP = "group1"; - public static final String PARTITION_KEY = "kafka.PARTITION_KEY"; - public static final String PARTITION = "kafka.PARTITION"; - public static final String KEY = "kafka.KEY"; + public static final String PARTITION = "kafka.EXCHANGE_NAME"; + public static final String KEY = "kafka.CONTENT_TYPE"; public static final String TOPIC = "kafka.TOPIC"; public static final String OFFSET = "kafka.OFFSET"; public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder"; public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder"; + public static final String KAFKA_DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + public static final String KAFKA_DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner"; + public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor"; private KafkaConstants() { // Utility class