http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 94893fb..e65ed3b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -16,30 +16,19 @@ */ package org.apache.camel.component.kafka; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Arrays; import java.util.Properties; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; + import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; -import org.apache.camel.util.ObjectHelper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class KafkaConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class); @@ -47,15 +36,14 @@ public class KafkaConsumer extends DefaultConsumer { protected ExecutorService executor; private final KafkaEndpoint endpoint; private final Processor processor; - private Map<ConsumerConnector, CyclicBarrier> consumerBarriers; - + public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; - this.consumerBarriers = new HashMap<ConsumerConnector, CyclicBarrier>(); - if (endpoint.getZookeeperConnect() == null) { - throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified"); + + if (endpoint.getBrokers() == null) { + throw new IllegalArgumentException("BootStrap servers must be specified"); } if (endpoint.getGroupId() == null) { throw new IllegalArgumentException("groupId must not be null"); @@ -64,57 +52,25 @@ public class KafkaConsumer extends DefaultConsumer { Properties getProps() { Properties props = endpoint.getConfiguration().createConsumerProperties(); - props.put("zookeeper.connect", endpoint.getZookeeperConnect()); - props.put("group.id", endpoint.getGroupId()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId()); return props; } @Override protected void doStart() throws Exception { super.doStart(); - log.info("Starting Kafka consumer"); - + LOG.info("Starting Kafka consumer"); executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConsumersCount(); i++) { - ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps())); - Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams()); - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic()); - - // commit periodically - if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) { - if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0) - && endpoint.getConsumerStreams() > 1) { - LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams."); - } - CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer)); - for (final KafkaStream<byte[], byte[]> stream : streams) { - executor.submit(new BatchingConsumerTask(stream, barrier)); - } - consumerBarriers.put(consumer, barrier); - } else { - // auto commit - for (final KafkaStream<byte[], byte[]> stream : streams) { - executor.submit(new AutoCommitConsumerTask(consumer, stream)); - } - consumerBarriers.put(consumer, null); - } + executor.submit(new KafkaFetchRecords(endpoint.getTopic(), i + "", getProps())); } - } @Override protected void doStop() throws Exception { super.doStop(); - log.info("Stopping Kafka consumer"); - - for (ConsumerConnector consumer : consumerBarriers.keySet()) { - if (consumer != null) { - consumer.shutdown(); - } - } - consumerBarriers.clear(); + LOG.info("Stopping Kafka consumer"); if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { @@ -126,102 +82,57 @@ public class KafkaConsumer extends DefaultConsumer { executor = null; } - class BatchingConsumerTask implements Runnable { + class KafkaFetchRecords implements Runnable { - private KafkaStream<byte[], byte[]> stream; - private CyclicBarrier barrier; + private final org.apache.kafka.clients.consumer.KafkaConsumer consumer; + private final String topicName; + private final String threadId; + private final Properties kafkaProps; - public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) { - this.stream = stream; - this.barrier = barrier; + public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { + this.topicName = topicName; + this.threadId = topicName + "-" + "Thread " + id; + this.kafkaProps = kafkaProps; + this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); } + @Override + @SuppressWarnings("unchecked") public void run() { - int processed = 0; - boolean consumerTimeout; - MessageAndMetadata<byte[], byte[]> mm; - ConsumerIterator<byte[], byte[]> it = stream.iterator(); - boolean hasNext = true; - while (hasNext) { - try { - consumerTimeout = false; - // only poll the next message if we are allowed to run and are not suspending - if (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) { - mm = it.next(); - Exchange exchange = endpoint.createKafkaExchange(mm); + try { + LOG.debug("Subscribing {} to topic {}", threadId, topicName); + consumer.subscribe(Arrays.asList(topicName)); + while (isRunAllowed() && !isSuspendingOrSuspended()) { + ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE); + for (ConsumerRecord<Object, Object> record : records) { + if (LOG.isTraceEnabled()) { + LOG.trace("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); + } + Exchange exchange = endpoint.createKafkaExchange(record); try { processor.process(exchange); } catch (Exception e) { - LOG.error(e.getMessage(), e); + getExceptionHandler().handleException("Error during processing", exchange, e); } processed++; - } else { - // we don't need to process the message - hasNext = false; - } - } catch (ConsumerTimeoutException e) { - LOG.debug("Consumer timeout occurred due " + e.getMessage(), e); - consumerTimeout = true; - } - - if (processed >= endpoint.getBatchSize() || consumerTimeout - || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round - try { - barrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS); - if (!consumerTimeout) { - processed = 0; + // if autocommit is false + if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) { + if (processed >= endpoint.getBatchSize()) { + consumer.commitSync(); + processed = 0; + } } - } catch (Exception e) { - getExceptionHandler().handleException("Error waiting for batch to complete", e); - break; } } + LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); + consumer.unsubscribe(); + } catch (Exception e) { + getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); } } - } - - class CommitOffsetTask implements Runnable { - - private final ConsumerConnector consumer; - - public CommitOffsetTask(ConsumerConnector consumer) { - this.consumer = consumer; - } - @Override - public void run() { - LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer)); - consumer.commitOffsets(); - } } - class AutoCommitConsumerTask implements Runnable { - - private final ConsumerConnector consumer; - private KafkaStream<byte[], byte[]> stream; - - public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) { - this.consumer = consumer; - this.stream = stream; - } - - public void run() { - ConsumerIterator<byte[], byte[]> it = stream.iterator(); - // only poll the next message if we are allowed to run and are not suspending - while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) { - MessageAndMetadata<byte[], byte[]> mm = it.next(); - Exchange exchange = endpoint.createKafkaExchange(mm); - try { - processor.process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); - } - } - // no more data so commit offset - LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer)); - consumer.commitOffsets(); - } - } }
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 760ec19a..5a56c39 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -16,10 +16,9 @@ */ package org.apache.camel.component.kafka; +import java.util.Properties; import java.util.concurrent.ExecutorService; -import kafka.message.MessageAndMetadata; - import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -29,6 +28,7 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.kafka.clients.consumer.ConsumerRecord; /** * The kafka component allows messages to be sent to (or consumed from) Apache Kafka brokers. @@ -38,8 +38,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS @UriParam private KafkaConfiguration configuration = new KafkaConfiguration(); - - @UriParam(description = "If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.", defaultValue = "false") + @UriParam private boolean bridgeEndpoint; public KafkaEndpoint() { @@ -73,30 +72,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS @Override public Producer createProducer() throws Exception { - String msgClassName = getConfiguration().getSerializerClass(); - String keyClassName = getConfiguration().getKeySerializerClass(); - if (msgClassName == null) { - msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER; - } - if (keyClassName == null) { - keyClassName = msgClassName; - } - - ClassLoader cl = getClass().getClassLoader(); - - Class<?> k; - try { - k = cl.loadClass(keyClassName); - } catch (ClassNotFoundException x) { - k = getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName); - } - Class<?> v; - try { - v = cl.loadClass(msgClassName); - } catch (ClassNotFoundException x) { - v = getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName); - } - return createProducer(k, v, this); + return createProducer(this); } @Override @@ -104,411 +80,597 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return true; } + @Override + public boolean isMultipleConsumersSupported() { + return true; + } + public ExecutorService createExecutor() { return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams()); } - public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) { + public Exchange createKafkaExchange(ConsumerRecord record) { Exchange exchange = super.createExchange(); Message message = exchange.getIn(); - message.setHeader(KafkaConstants.PARTITION, mm.partition()); - message.setHeader(KafkaConstants.TOPIC, mm.topic()); - message.setHeader(KafkaConstants.OFFSET, mm.offset()); - if (mm.key() != null) { - message.setHeader(KafkaConstants.KEY, new String(mm.key())); + message.setHeader(KafkaConstants.PARTITION, record.partition()); + message.setHeader(KafkaConstants.TOPIC, record.topic()); + message.setHeader(KafkaConstants.OFFSET, record.offset()); + if (record.key() != null) { + message.setHeader(KafkaConstants.KEY, record.key()); } - message.setBody(mm.message()); + message.setBody(record.value()); return exchange; } - protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass, KafkaEndpoint endpoint) { - return new KafkaProducer<K, V>(endpoint); + protected KafkaProducer createProducer(KafkaEndpoint endpoint) { + return new KafkaProducer(endpoint); } // Delegated properties from the configuration //------------------------------------------------------------------------- - public String getZookeeperConnect() { - return configuration.getZookeeperConnect(); + public Properties createProducerProperties() { + return configuration.createProducerProperties(); + } + + public void setValueDeserializer(String valueDeserializer) { + configuration.setValueDeserializer(valueDeserializer); } - public void setZookeeperConnect(String zookeeperConnect) { - configuration.setZookeeperConnect(zookeeperConnect); + public void setRequestTimeoutMs(Integer requestTimeoutMs) { + configuration.setRequestTimeoutMs(requestTimeoutMs); } - public String getZookeeperHost() { - return configuration.getZookeeperHost(); + public void setProducerBatchSize(Integer producerBatchSize) { + configuration.setProducerBatchSize(producerBatchSize); } - public void setZookeeperHost(String zookeeperHost) { - configuration.setZookeeperHost(zookeeperHost); + public void setRetryBackoffMs(Integer retryBackoffMs) { + configuration.setRetryBackoffMs(retryBackoffMs); } - public int getZookeeperPort() { - return configuration.getZookeeperPort(); + public void setNoOfMetricsSample(Integer noOfMetricsSample) { + configuration.setNoOfMetricsSample(noOfMetricsSample); } - public void setZookeeperPort(int zookeeperPort) { - configuration.setZookeeperPort(zookeeperPort); + public String getMetricReporters() { + return configuration.getMetricReporters(); } - public String getGroupId() { - return configuration.getGroupId(); + public void setSslKeystoreType(String sslKeystoreType) { + configuration.setSslKeystoreType(sslKeystoreType); + } + + public void setSslCipherSuites(String sslCipherSuites) { + configuration.setSslCipherSuites(sslCipherSuites); + } + + public void setClientId(String clientId) { + configuration.setClientId(clientId); + } + + public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) { + configuration.setMetricsSampleWindowMs(metricsSampleWindowMs); + } + + public String getKeyDeserializer() { + return configuration.getKeyDeserializer(); + } + + public int getConsumersCount() { + return configuration.getConsumersCount(); + } + + public String getSslKeyPassword() { + return configuration.getSslKeyPassword(); + } + + public void setSendBufferBytes(Integer sendBufferBytes) { + configuration.setSendBufferBytes(sendBufferBytes); + } + + public Boolean isAutoCommitEnable() { + return configuration.isAutoCommitEnable(); + } + + public Integer getMaxBlockMs() { + return configuration.getMaxBlockMs(); + } + + public String getConsumerId() { + return configuration.getConsumerId(); + } + + public void setSslProtocol(String sslProtocol) { + configuration.setSslProtocol(sslProtocol); + } + + public void setReceiveBufferBytes(Integer receiveBufferBytes) { + configuration.setReceiveBufferBytes(receiveBufferBytes); + } + + public Boolean getCheckCrcs() { + return configuration.getCheckCrcs(); } public void setGroupId(String groupId) { configuration.setGroupId(groupId); } - public String getPartitioner() { - return configuration.getPartitioner(); + public String getCompressionCodec() { + return configuration.getCompressionCodec(); } - public void setPartitioner(String partitioner) { - configuration.setPartitioner(partitioner); + public String getGroupId() { + return configuration.getGroupId(); } - public String getTopic() { - return configuration.getTopic(); + public void setSslTruststoreLocation(String sslTruststoreLocation) { + configuration.setSslTruststoreLocation(sslTruststoreLocation); } - public void setTopic(String topic) { - configuration.setTopic(topic); + public String getKerberosInitCmd() { + return configuration.getKerberosInitCmd(); } - public String getBrokers() { - return configuration.getBrokers(); + public String getAutoOffsetReset() { + return configuration.getAutoOffsetReset(); + } + + public void setAutoCommitEnable(Boolean autoCommitEnable) { + configuration.setAutoCommitEnable(autoCommitEnable); + } + + public void setSerializerClass(String serializerClass) { + configuration.setSerializerClass(serializerClass); + } + + public Integer getQueueBufferingMaxMessages() { + return configuration.getQueueBufferingMaxMessages(); + } + + public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) { + configuration.setSslEndpointAlgorithm(sslEndpointAlgorithm); + } + + public void setRetries(Integer retries) { + configuration.setRetries(retries); + } + + public void setAutoOffsetReset(String autoOffsetReset) { + configuration.setAutoOffsetReset(autoOffsetReset); + } + + public Integer getSessionTimeoutMs() { + return configuration.getSessionTimeoutMs(); + } + + public Integer getBufferMemorySize() { + return configuration.getBufferMemorySize(); + } + + public String getKeySerializerClass() { + return configuration.getKeySerializerClass(); + } + + public void setSslProvider(String sslProvider) { + configuration.setSslProvider(sslProvider); + } + + public void setFetchMinBytes(Integer fetchMinBytes) { + configuration.setFetchMinBytes(fetchMinBytes); + } + + public Integer getAutoCommitIntervalMs() { + return configuration.getAutoCommitIntervalMs(); + } + + public void setKeySerializerClass(String keySerializerClass) { + configuration.setKeySerializerClass(keySerializerClass); + } + + public Integer getConnectionMaxIdleMs() { + return configuration.getConnectionMaxIdleMs(); + } + + public Integer getReceiveBufferBytes() { + return configuration.getReceiveBufferBytes(); } public void setBrokers(String brokers) { configuration.setBrokers(brokers); } - public int getConsumerStreams() { - return configuration.getConsumerStreams(); + public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) { + configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs); } - public void setConsumerStreams(int consumerStreams) { - configuration.setConsumerStreams(consumerStreams); + public String getValueDeserializer() { + return configuration.getValueDeserializer(); } - public int getBatchSize() { - return configuration.getBatchSize(); + public String getPartitioner() { + return configuration.getPartitioner(); } - public void setBatchSize(int batchSize) { - this.configuration.setBatchSize(batchSize); + public String getSslTruststoreLocation() { + return configuration.getSslTruststoreLocation(); + } + + public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) { + configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs); + } + + public String getSslProvider() { + return configuration.getSslProvider(); + } + + public void setMetricReporters(String metricReporters) { + configuration.setMetricReporters(metricReporters); + } + + public void setSslTruststorePassword(String sslTruststorePassword) { + configuration.setSslTruststorePassword(sslTruststorePassword); + } + + public void setMaxInFlightRequest(Integer maxInFlightRequest) { + configuration.setMaxInFlightRequest(maxInFlightRequest); + } + + public String getTopic() { + return configuration.getTopic(); } public int getBarrierAwaitTimeoutMs() { return configuration.getBarrierAwaitTimeoutMs(); } - public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) { - this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs); + public Integer getFetchMinBytes() { + return configuration.getFetchMinBytes(); } - public int getConsumersCount() { - return this.configuration.getConsumersCount(); + public Integer getHeartbeatIntervalMs() { + return configuration.getHeartbeatIntervalMs(); } - public void setConsumersCount(int consumersCount) { - this.configuration.setConsumersCount(consumersCount); + public void setKeyDeserializer(String keyDeserializer) { + configuration.setKeyDeserializer(keyDeserializer); } - public void setConsumerTimeoutMs(int consumerTimeoutMs) { - configuration.setConsumerTimeoutMs(consumerTimeoutMs); + public Integer getMaxRequestSize() { + return configuration.getMaxRequestSize(); } - public void setSerializerClass(String serializerClass) { - configuration.setSerializerClass(serializerClass); + public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) { + configuration.setMetadataMaxAgeMs(metadataMaxAgeMs); } - public void setQueueBufferingMaxMessages(int queueBufferingMaxMessages) { - configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages); + public String getSslKeystoreType() { + return configuration.getSslKeystoreType(); } - public int getFetchWaitMaxMs() { - return configuration.getFetchWaitMaxMs(); + public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) { + configuration.setKerberosRenewWindowFactor(kerberosRenewWindowFactor); } - public Integer getZookeeperConnectionTimeoutMs() { - return configuration.getZookeeperConnectionTimeoutMs(); + public Integer getKerberosBeforeReloginMinTime() { + return configuration.getKerberosBeforeReloginMinTime(); } - public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) { - configuration.setZookeeperConnectionTimeoutMs(zookeeperConnectionTimeoutMs); + public String getSslEnabledProtocols() { + return configuration.getSslEnabledProtocols(); } - public void setMessageSendMaxRetries(int messageSendMaxRetries) { - configuration.setMessageSendMaxRetries(messageSendMaxRetries); + public Integer getMaxInFlightRequest() { + return configuration.getMaxInFlightRequest(); } - public int getQueueBufferingMaxMs() { - return configuration.getQueueBufferingMaxMs(); + public Integer getProducerBatchSize() { + return configuration.getProducerBatchSize(); } - public void setRequestRequiredAcks(short requestRequiredAcks) { - configuration.setRequestRequiredAcks(requestRequiredAcks); + public void setSslKeystorePassword(String sslKeystorePassword) { + configuration.setSslKeystorePassword(sslKeystorePassword); } - public Integer getRebalanceBackoffMs() { - return configuration.getRebalanceBackoffMs(); + public Boolean getBlockOnBufferFull() { + return configuration.getBlockOnBufferFull(); } - public void setQueueEnqueueTimeoutMs(int queueEnqueueTimeoutMs) { - configuration.setQueueEnqueueTimeoutMs(queueEnqueueTimeoutMs); + public void setCheckCrcs(Boolean checkCrcs) { + configuration.setCheckCrcs(checkCrcs); } - public int getFetchMessageMaxBytes() { - return configuration.getFetchMessageMaxBytes(); + public int getConsumerStreams() { + return configuration.getConsumerStreams(); } - public int getQueuedMaxMessages() { - return configuration.getQueuedMaxMessageChunks(); + public void setConsumersCount(int consumersCount) { + configuration.setConsumersCount(consumersCount); } - public int getAutoCommitIntervalMs() { - return configuration.getAutoCommitIntervalMs(); + public int getBatchSize() { + return configuration.getBatchSize(); } - public void setSocketTimeoutMs(int socketTimeoutMs) { - configuration.setSocketTimeoutMs(socketTimeoutMs); + public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) { + configuration.setAutoCommitIntervalMs(autoCommitIntervalMs); } - public void setAutoCommitIntervalMs(int autoCommitIntervalMs) { - configuration.setAutoCommitIntervalMs(autoCommitIntervalMs); + public void setSslTruststoreType(String sslTruststoreType) { + configuration.setSslTruststoreType(sslTruststoreType); } - public void setRequestTimeoutMs(int requestTimeoutMs) { - configuration.setRequestTimeoutMs(requestTimeoutMs); + public Integer getConsumerRequestTimeoutMs() { + return configuration.getConsumerRequestTimeoutMs(); } - public void setCompressedTopics(String compressedTopics) { - configuration.setCompressedTopics(compressedTopics); + public String getSslKeystorePassword() { + return configuration.getSslKeystorePassword(); } - public int getSocketReceiveBufferBytes() { - return configuration.getSocketReceiveBufferBytes(); + public void setSslKeyPassword(String sslKeyPassword) { + configuration.setSslKeyPassword(sslKeyPassword); } - public void setSendBufferBytes(int sendBufferBytes) { - configuration.setSendBufferBytes(sendBufferBytes); + public void setBlockOnBufferFull(Boolean blockOnBufferFull) { + configuration.setBlockOnBufferFull(blockOnBufferFull); } - public void setFetchMessageMaxBytes(int fetchMessageMaxBytes) { - configuration.setFetchMessageMaxBytes(fetchMessageMaxBytes); + public Integer getRequestRequiredAcks() { + return configuration.getRequestRequiredAcks(); } - public int getRefreshLeaderBackoffMs() { - return configuration.getRefreshLeaderBackoffMs(); + public Double getKerberosRenewWindowFactor() { + return configuration.getKerberosRenewWindowFactor(); } - public void setFetchWaitMaxMs(int fetchWaitMaxMs) { - configuration.setFetchWaitMaxMs(fetchWaitMaxMs); + public void setKerberosInitCmd(String kerberosInitCmd) { + configuration.setKerberosInitCmd(kerberosInitCmd); } - public int getTopicMetadataRefreshIntervalMs() { - return configuration.getTopicMetadataRefreshIntervalMs(); + public Integer getRetryBackoffMs() { + return configuration.getRetryBackoffMs(); } - public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) { - configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs); + public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) { + configuration.setSslTrustmanagerAlgorithm(sslTrustmanagerAlgorithm); } - public Integer getConsumerTimeoutMs() { - return configuration.getConsumerTimeoutMs(); + public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) { + configuration.setConsumerRequestTimeoutMs(consumerRequestTimeoutMs); } - public void setAutoCommitEnable(boolean autoCommitEnable) { - configuration.setAutoCommitEnable(autoCommitEnable); + public void setReconnectBackoffMs(Integer reconnectBackoffMs) { + configuration.setReconnectBackoffMs(reconnectBackoffMs); } - public String getCompressionCodec() { - return configuration.getCompressionCodec(); + public void setKerberosRenewJitter(Double kerberosRenewJitter) { + configuration.setKerberosRenewJitter(kerberosRenewJitter); + } + + public String getSslKeystoreLocation() { + return configuration.getSslKeystoreLocation(); } - public void setProducerType(String producerType) { - configuration.setProducerType(producerType); + public Integer getNoOfMetricsSample() { + return configuration.getNoOfMetricsSample(); + } + + public String getSslKeymanagerAlgorithm() { + return configuration.getSslKeymanagerAlgorithm(); + } + + public void setConsumerId(String consumerId) { + configuration.setConsumerId(consumerId); } public String getClientId() { return configuration.getClientId(); } - public int getFetchMinBytes() { - return configuration.getFetchMinBytes(); + public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) { + configuration.setFetchWaitMaxMs(fetchWaitMaxMs); } - public String getAutoOffsetReset() { - return configuration.getAutoOffsetReset(); + public String getSslCipherSuites() { + return configuration.getSslCipherSuites(); } - public void setRefreshLeaderBackoffMs(int refreshLeaderBackoffMs) { - configuration.setRefreshLeaderBackoffMs(refreshLeaderBackoffMs); + public void setRequestRequiredAcks(Integer requestRequiredAcks) { + configuration.setRequestRequiredAcks(requestRequiredAcks); } - public void setAutoOffsetReset(String autoOffsetReset) { - configuration.setAutoOffsetReset(autoOffsetReset); + public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) { + configuration.setConnectionMaxIdleMs(connectionMaxIdleMs); } - public void setConsumerId(String consumerId) { - configuration.setConsumerId(consumerId); + public String getSslTrustmanagerAlgorithm() { + return configuration.getSslTrustmanagerAlgorithm(); } - public int getRetryBackoffMs() { - return configuration.getRetryBackoffMs(); + public String getSslTruststorePassword() { + return configuration.getSslTruststorePassword(); } - public int getRebalanceMaxRetries() { - return configuration.getRebalanceMaxRetries(); + public void setTimeoutMs(Integer timeoutMs) { + configuration.setTimeoutMs(timeoutMs); } - public Boolean isAutoCommitEnable() { - return configuration.isAutoCommitEnable(); + public void setConsumerStreams(int consumerStreams) { + configuration.setConsumerStreams(consumerStreams); } - public void setQueueBufferingMaxMs(int queueBufferingMaxMs) { - configuration.setQueueBufferingMaxMs(queueBufferingMaxMs); + public String getSslTruststoreType() { + return configuration.getSslTruststoreType(); } - public void setRebalanceMaxRetries(int rebalanceMaxRetries) { - configuration.setRebalanceMaxRetries(rebalanceMaxRetries); + public String getSecurityProtocol() { + return configuration.getSecurityProtocol(); } - public int getZookeeperSessionTimeoutMs() { - return configuration.getZookeeperSessionTimeoutMs(); + public void setBufferMemorySize(Integer bufferMemorySize) { + configuration.setBufferMemorySize(bufferMemorySize); } - public void setKeySerializerClass(String keySerializerClass) { - configuration.setKeySerializerClass(keySerializerClass); + public void setSaslKerberosServiceName(String saslKerberosServiceName) { + configuration.setSaslKerberosServiceName(saslKerberosServiceName); } public void setCompressionCodec(String compressionCodec) { configuration.setCompressionCodec(compressionCodec); } - public void setClientId(String clientId) { - configuration.setClientId(clientId); + public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) { + configuration.setKerberosBeforeReloginMinTime(kerberosBeforeReloginMinTime); } - public int getSocketTimeoutMs() { - return configuration.getSocketTimeoutMs(); + public Integer getMetadataMaxAgeMs() { + return configuration.getMetadataMaxAgeMs(); } - public String getCompressedTopics() { - return configuration.getCompressedTopics(); + public String getSerializerClass() { + return configuration.getSerializerClass(); } - public int getZookeeperSyncTimeMs() { - return configuration.getZookeeperSyncTimeMs(); + public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) { + configuration.setSslKeymanagerAlgorithm(sslKeymanagerAlgorithm); } - public void setSocketReceiveBufferBytes(int socketReceiveBufferBytes) { - configuration.setSocketReceiveBufferBytes(socketReceiveBufferBytes); + public void setMaxRequestSize(Integer maxRequestSize) { + configuration.setMaxRequestSize(maxRequestSize); } - public int getQueueEnqueueTimeoutMs() { - return configuration.getQueueEnqueueTimeoutMs(); + public Double getKerberosRenewJitter() { + return configuration.getKerberosRenewJitter(); } - public int getQueueBufferingMaxMessages() { - return configuration.getQueueBufferingMaxMessages(); + public String getPartitionAssignor() { + return configuration.getPartitionAssignor(); } - public void setZookeeperSyncTimeMs(int zookeeperSyncTimeMs) { - configuration.setZookeeperSyncTimeMs(zookeeperSyncTimeMs); + public Integer getMetadataFetchTimeoutMs() { + return configuration.getMetadataFetchTimeoutMs(); } - public String getKeySerializerClass() { - return configuration.getKeySerializerClass(); + public void setSecurityProtocol(String securityProtocol) { + configuration.setSecurityProtocol(securityProtocol); } - public void setTopicMetadataRefreshIntervalMs(int topicMetadataRefreshIntervalMs) { - configuration.setTopicMetadataRefreshIntervalMs(topicMetadataRefreshIntervalMs); + public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) { + configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages); } - public void setBatchNumMessages(int batchNumMessages) { - configuration.setBatchNumMessages(batchNumMessages); + public String getSaslKerberosServiceName() { + return configuration.getSaslKerberosServiceName(); } - public int getSendBufferBytes() { - return configuration.getSendBufferBytes(); + public void setBatchSize(int batchSize) { + configuration.setBatchSize(batchSize); } - public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) { - configuration.setRebalanceBackoffMs(rebalanceBackoffMs); + public Integer getLingerMs() { + return configuration.getLingerMs(); } - public void setQueuedMaxMessages(int queuedMaxMessages) { - configuration.setQueuedMaxMessageChunks(queuedMaxMessages); + public Integer getRetries() { + return configuration.getRetries(); } - public void setRetryBackoffMs(int retryBackoffMs) { - configuration.setRetryBackoffMs(retryBackoffMs); + public Integer getMaxPartitionFetchBytes() { + return configuration.getMaxPartitionFetchBytes(); } - public int getBatchNumMessages() { - return configuration.getBatchNumMessages(); + public String getSslEndpointAlgorithm() { + return configuration.getSslEndpointAlgorithm(); } - public short getRequestRequiredAcks() { - return configuration.getRequestRequiredAcks(); + public Integer getReconnectBackoffMs() { + return configuration.getReconnectBackoffMs(); } - public String getProducerType() { - return configuration.getProducerType(); + public void setLingerMs(Integer lingerMs) { + configuration.setLingerMs(lingerMs); } - public String getConsumerId() { - return configuration.getConsumerId(); + public void setPartitionAssignor(String partitionAssignor) { + configuration.setPartitionAssignor(partitionAssignor); } - public int getMessageSendMaxRetries() { - return configuration.getMessageSendMaxRetries(); + public Integer getRequestTimeoutMs() { + return configuration.getRequestTimeoutMs(); } - public void setFetchMinBytes(int fetchMinBytes) { - configuration.setFetchMinBytes(fetchMinBytes); + public Properties createConsumerProperties() { + return configuration.createConsumerProperties(); } - public String getSerializerClass() { - return configuration.getSerializerClass(); + public void setTopic(String topic) { + configuration.setTopic(topic); } - public int getRequestTimeoutMs() { - return configuration.getRequestTimeoutMs(); + public Integer getFetchWaitMaxMs() { + return configuration.getFetchWaitMaxMs(); } - @Override - public boolean isMultipleConsumersSupported() { - return true; + public void setSessionTimeoutMs(Integer sessionTimeoutMs) { + configuration.setSessionTimeoutMs(sessionTimeoutMs); } - public boolean isBridgeEndpoint() { - return bridgeEndpoint; + public void setSslEnabledProtocols(String sslEnabledProtocols) { + configuration.setSslEnabledProtocols(sslEnabledProtocols); } - public void setBridgeEndpoint(boolean bridgeEndpoint) { - this.bridgeEndpoint = bridgeEndpoint; + public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) { + configuration.setHeartbeatIntervalMs(heartbeatIntervalMs); } - public String getOffsetsStorage() { - return configuration.getOffsetsStorage(); + public void setMaxBlockMs(Integer maxBlockMs) { + configuration.setMaxBlockMs(maxBlockMs); } - public void setOffsetsStorage(String offsetsStorage) { - configuration.setOffsetsStorage(offsetsStorage); + public void setSslKeystoreLocation(String sslKeystoreLocation) { + configuration.setSslKeystoreLocation(sslKeystoreLocation); } - public Boolean isDualCommitEnabled() { - return configuration.isDualCommitEnabled(); + public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) { + configuration.setMaxPartitionFetchBytes(maxPartitionFetchBytes); + } + + public void setPartitioner(String partitioner) { + configuration.setPartitioner(partitioner); + } + + public String getBrokers() { + return configuration.getBrokers(); } - public void setDualCommitEnabled(boolean dualCommitEnabled) { - configuration.setDualCommitEnabled(dualCommitEnabled); + public Integer getMetricsSampleWindowMs() { + return configuration.getMetricsSampleWindowMs(); } + public Integer getSendBufferBytes() { + return configuration.getSendBufferBytes(); + } + + public Integer getTimeoutMs() { + return configuration.getTimeoutMs(); + } + + public String getSslProtocol() { + return configuration.getSslProtocol(); + } + + public boolean isBridgeEndpoint() { + return bridgeEndpoint; + } + + /** + * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. + */ + public void setBridgeEndpoint(boolean bridgeEndpoint) { + this.bridgeEndpoint = bridgeEndpoint; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 06a0317..6f9ea79 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -18,20 +18,16 @@ package org.apache.camel.component.kafka; import java.util.Properties; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; -/** - * - */ -public class KafkaProducer<K, V> extends DefaultProducer { +public class KafkaProducer extends DefaultProducer { - protected Producer<K, V> producer; + private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; public KafkaProducer(KafkaEndpoint endpoint) { @@ -39,26 +35,38 @@ public class KafkaProducer<K, V> extends DefaultProducer { this.endpoint = endpoint; } - @Override - protected void doStop() throws Exception { - if (producer != null) { - producer.close(); - } - } - Properties getProps() { Properties props = endpoint.getConfiguration().createProducerProperties(); if (endpoint.getBrokers() != null) { - props.put("metadata.broker.list", endpoint.getBrokers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); } return props; } + public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { + return kafkaProducer; + } + + /** + * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance. + */ + public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + + @Override + protected void doStop() throws Exception { + if (kafkaProducer != null) { + kafkaProducer.close(); + } + } + @Override protected void doStart() throws Exception { Properties props = getProps(); - ProducerConfig config = new ProducerConfig(props); - producer = new Producer<K, V>(config); + if (kafkaProducer == null) { + kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props); + } } @Override @@ -71,26 +79,26 @@ public class KafkaProducer<K, V> extends DefaultProducer { if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } - K partitionKey = (K) exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); + Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); boolean hasPartitionKey = partitionKey != null; - K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY); + Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY); boolean hasMessageKey = messageKey != null; - V msg = (V) exchange.getIn().getBody(); - KeyedMessage<K, V> data; + Object msg = exchange.getIn().getBody(); + ProducerRecord record; if (hasPartitionKey && hasMessageKey) { - data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg); - } else if (hasPartitionKey) { - data = new KeyedMessage<K, V>(topic, partitionKey, msg); + record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg); } else if (hasMessageKey) { - data = new KeyedMessage<K, V>(topic, messageKey, msg); + record = new ProducerRecord(topic, messageKey, msg); } else { log.warn("No message key or partition key set"); - data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg); + record = new ProducerRecord(topic, msg); } - producer.send(data); + + // TODO: add support for async callback in the send + kafkaProducer.send(record); } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java index 0d2b003..d87f885 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java @@ -30,26 +30,23 @@ import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BaseEmbeddedKafkaTest extends CamelTestSupport { + static EmbeddedZookeeper embeddedZookeeper; static EmbeddedKafkaCluster embeddedKafkaCluster; - - private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class); private static volatile int zookeeperPort; - + private static volatile int karfkaPort; - + @BeforeClass public static void beforeClass() { // start from somewhere in the 23xxx range zookeeperPort = AvailablePortFinder.getNextAvailable(23000); // find another ports for proxy route test karfkaPort = AvailablePortFinder.getNextAvailable(24000); - + embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort); List<Integer> kafkaPorts = new ArrayList<Integer>(); // -1 for any available port @@ -60,9 +57,9 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { } catch (IOException e) { e.printStackTrace(); } - LOG.info("Embedded Zookeeper connection: " + embeddedZookeeper.getConnection()); + System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection()); embeddedKafkaCluster.startup(); - LOG.info("Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList()); + System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList()); } @AfterClass @@ -70,7 +67,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { embeddedKafkaCluster.shutdown(); embeddedZookeeper.shutdown(); } - + @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry jndi = super.createRegistry(); @@ -81,7 +78,6 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { jndi.bind("prop", prop); return jndi; } - @Override protected CamelContext createCamelContext() throws Exception { @@ -89,12 +85,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { context.addComponent("properties", new PropertiesComponent("ref:prop")); return context; } - protected static int getZookeeperPort() { return zookeeperPort; } - + protected static int getKarfkaPort() { return karfkaPort; } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index eb6dd09..31c2dd6 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -18,14 +18,17 @@ package org.apache.camel.component.kafka; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import org.apache.camel.CamelContext; -import org.apache.camel.spi.Registry; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class KafkaComponentTest { @@ -34,10 +37,6 @@ public class KafkaComponentTest { @Test public void testPropertiesSet() throws Exception { Map<String, Object> params = new HashMap<String, Object>(); - params.put("zookeeperHost", "somehost"); - params.put("zookeeperPort", 2987); - params.put("portNumber", 14123); - params.put("consumerStreams", "3"); params.put("topic", "mytopic"); params.put("partitioner", "com.class.Party"); @@ -45,97 +44,166 @@ public class KafkaComponentTest { String remaining = "broker1:12345,broker2:12566"; KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); - assertEquals("somehost:2987", endpoint.getZookeeperConnect()); - assertEquals("somehost", endpoint.getZookeeperHost()); - assertEquals(2987, endpoint.getZookeeperPort()); assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); assertEquals("mytopic", endpoint.getTopic()); - assertEquals(3, endpoint.getConsumerStreams()); assertEquals("com.class.Party", endpoint.getPartitioner()); } @Test - public void testZookeeperConnectPropertyOverride() throws Exception { + public void testAllProducerConfigProperty() throws Exception { Map<String, Object> params = new HashMap<String, Object>(); - params.put("zookeeperConnect", "thehost:2181/chroot"); - params.put("zookeeperHost", "somehost"); - params.put("zookeeperPort", 2987); - params.put("portNumber", 14123); - params.put("consumerStreams", "3"); - params.put("topic", "mytopic"); - params.put("partitioner", "com.class.Party"); + setProducerProperty(params); - String uri = "kafka:broker1:12345,broker2:12566"; - String remaining = "broker1:12345,broker2:12566"; + String uri = "kafka:dev1:12345,dev2:12566"; + String remaining = "dev1:12345,dev2:12566"; KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); - assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect()); - assertNull(endpoint.getZookeeperHost()); - assertEquals(-1, endpoint.getZookeeperPort()); - assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); - assertEquals("mytopic", endpoint.getTopic()); - assertEquals(3, endpoint.getConsumerStreams()); - assertEquals("com.class.Party", endpoint.getPartitioner()); + + assertEquals(new Integer(0), endpoint.getRequestRequiredAcks()); + assertEquals(new Integer(1), endpoint.getBufferMemorySize()); + assertEquals(new Integer(10), endpoint.getProducerBatchSize()); + assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs()); + assertEquals(new Integer(1), endpoint.getMaxBlockMs()); + assertEquals(false, endpoint.getBlockOnBufferFull()); + assertEquals(new Integer(1), endpoint.getBufferMemorySize()); + assertEquals("testing", endpoint.getClientId()); + assertEquals("none", endpoint.getCompressionCodec()); + assertEquals(new Integer(1), endpoint.getLingerMs()); + assertEquals(new Integer(100), endpoint.getMaxRequestSize()); + assertEquals(100, endpoint.getRequestTimeoutMs().intValue()); + assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs()); + assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs()); + assertEquals(new Integer(23), endpoint.getReceiveBufferBytes()); + assertEquals(new Integer(234), endpoint.getReconnectBackoffMs()); + assertEquals(new Integer(0), endpoint.getRetries()); + assertEquals(3782, endpoint.getRetryBackoffMs().intValue()); + assertEquals(765, endpoint.getSendBufferBytes().intValue()); + assertEquals(new Integer(2045), endpoint.getTimeoutMs()); + assertEquals(new Integer(1), endpoint.getMaxInFlightRequest()); + assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport", endpoint.getMetricReporters()); + assertEquals(new Integer(3), endpoint.getNoOfMetricsSample()); + assertEquals(new Integer(12344), endpoint.getMetricsSampleWindowMs()); + assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, endpoint.getSerializerClass()); + assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, endpoint.getKeySerializerClass()); + assertEquals("testing", endpoint.getSslKeyPassword()); + assertEquals("/abc", endpoint.getSslKeystoreLocation()); + assertEquals("testing", endpoint.getSslKeystorePassword()); + assertEquals("/abc", endpoint.getSslTruststoreLocation()); + assertEquals("testing", endpoint.getSslTruststorePassword()); + assertEquals("test", endpoint.getSaslKerberosServiceName()); + assertEquals("PLAINTEXT", endpoint.getSecurityProtocol()); + assertEquals("TLSv1.2", endpoint.getSslEnabledProtocols()); + assertEquals("JKS", endpoint.getSslKeystoreType()); + assertEquals("TLS", endpoint.getSslProtocol()); + assertEquals("test", endpoint.getSslProvider()); + assertEquals("JKS", endpoint.getSslTruststoreType()); + assertEquals("/usr/bin/kinit", endpoint.getKerberosInitCmd()); + assertEquals(new Integer(60000), endpoint.getKerberosBeforeReloginMinTime()); + assertEquals(new Double(0.05), endpoint.getKerberosRenewJitter()); + assertEquals(new Double(0.8), endpoint.getKerberosRenewWindowFactor()); + assertEquals("MAC", endpoint.getSslCipherSuites()); + assertEquals("test", endpoint.getSslEndpointAlgorithm()); + assertEquals("SunX509", endpoint.getSslKeymanagerAlgorithm()); + assertEquals("PKIX", endpoint.getSslTrustmanagerAlgorithm()); } @Test - public void testPropertiesConfigrationMerge() throws Exception { + public void testAllProducerKeys() throws Exception { Map<String, Object> params = new HashMap<String, Object>(); - params.put("portNumber", 14123); - params.put("consumerStreams", "3"); - params.put("topic", "mytopic"); - params.put("partitioner", "com.class.Party"); - KafkaConfiguration kc = new KafkaConfiguration(); - kc.setZookeeperHost("somehost"); - kc.setZookeeperPort(2987); - kc.setTopic("default"); - params.put("configuration", kc); - - String uri = "kafka:broker1:12345,broker2:12566"; - String remaining = "broker1:12345,broker2:12566"; + String uri = "kafka:dev1:12345,dev2:12566"; + String remaining = "dev1:12345,dev2:12566"; KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); - assertEquals("somehost:2987", endpoint.getZookeeperConnect()); - assertEquals("somehost", endpoint.getZookeeperHost()); - assertEquals(2987, endpoint.getZookeeperPort()); - assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); - assertEquals("mytopic", endpoint.getTopic()); - assertEquals(3, endpoint.getConsumerStreams()); - assertEquals("com.class.Party", endpoint.getPartitioner()); - assertNull("dirty", kc.getBrokers()); - assertEquals("default", kc.getTopic()); + assertEquals(endpoint.getConfiguration().createProducerProperties().keySet(), getProducerKeys().keySet()); } - @Test - public void testPropertiesConfigrationRefMerge() throws Exception { - Map<String, Object> params = new HashMap<String, Object>(); - params.put("portNumber", 14123); - params.put("consumerStreams", "3"); - params.put("topic", "mytopic"); - params.put("partitioner", "com.class.Party"); - - KafkaConfiguration kc = new KafkaConfiguration(); - kc.setZookeeperHost("somehost"); - kc.setZookeeperPort(2987); - kc.setTopic("default"); - Registry registry = Mockito.mock(Registry.class); - Mockito.when(registry.lookupByName("baseconf")).thenReturn(kc); - Mockito.when(context.getRegistry()).thenReturn(registry); - params.put("configuration", "#baseconf"); - - String uri = "kafka:broker1:12345,broker2:12566"; - String remaining = "broker1:12345,broker2:12566"; + private Properties getProducerKeys() { + Properties props = new Properties(); + + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); + props.put(ProducerConfig.RETRIES_CONFIG, "0"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); + props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000"); + props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000"); + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1048576"); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER); + props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768"); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); + props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072"); + props.put(ProducerConfig.TIMEOUT_CONFIG, "30000"); + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000"); + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000"); + props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2"); + props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000"); + props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "50"); + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2, TLSv1.1, TLSv1"); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); + props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS"); + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); + props.put(SaslConfigs.SASL_KERBEROS_KINIT_CMD, "/usr/bin/kinit"); + props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "60000"); + props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, "0.05"); + props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, "0.8"); + props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509"); + props.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX"); + + return props; + } - KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); - assertEquals("somehost:2987", endpoint.getZookeeperConnect()); - assertEquals("somehost", endpoint.getZookeeperHost()); - assertEquals(2987, endpoint.getZookeeperPort()); - assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); - assertEquals("mytopic", endpoint.getTopic()); - assertEquals(3, endpoint.getConsumerStreams()); - assertEquals("com.class.Party", endpoint.getPartitioner()); - assertNull("dirty", kc.getBrokers()); - assertEquals("default", kc.getTopic()); + private void setProducerProperty(Map<String, Object> params) { + params.put("requestRequiredAcks", 0); + params.put("bufferMemorySize", 1); + params.put("compressionCodec", "none"); + params.put("retries", 0); + params.put("producerBatchSize", 10); + params.put("connectionMaxIdleMs", 12); + params.put("lingerMs", 1); + params.put("maxBlockMs", 1); + params.put("maxRequestSize", 100); + params.put("receiveBufferBytes", 23); + params.put("requestTimeoutMs", 100); + params.put("sendBufferBytes", 765); + params.put("timeoutMs", 2045); + params.put("blockOnBufferFull", false); + params.put("maxInFlightRequest", 1); + params.put("metadataFetchTimeoutMs", 9043); + params.put("metadataMaxAgeMs", 1029); + params.put("reconnectBackoffMs", 234); + params.put("retryBackoffMs", 3782); + params.put("noOfMetricsSample", 3); + params.put("metricReporters", "org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport"); + params.put("metricsSampleWindowMs", 12344); + params.put("clientId", "testing"); + params.put("sslKeyPassword", "testing"); + params.put("sslKeystoreLocation", "/abc"); + params.put("sslKeystorePassword", "testing"); + params.put("sslTruststoreLocation", "/abc"); + params.put("sslTruststorePassword", "testing"); + params.put("saslKerberosServiceName", "test"); + params.put("securityProtocol", "PLAINTEXT"); + params.put("sslEnabledProtocols", "TLSv1.2"); + params.put("sslKeystoreType", "JKS"); + params.put("sslProtocol", "TLS"); + params.put("sslProvider", "test"); + params.put("sslTruststoreType", "JKS"); + params.put("kerberosInitCmd", "/usr/bin/kinit"); + params.put("kerberosBeforeReloginMinTime", 60000); + params.put("kerberosRenewJitter", 0.05); + params.put("kerberosRenewWindowFactor", 0.8); + params.put("sslCipherSuites", "MAC"); + params.put("sslEndpointAlgorithm", "test"); + params.put("sslKeymanagerAlgorithm", "SunX509"); + params.put("sslTrustmanagerAlgorithm", "PKIX"); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java index a6f59c0..9128c62 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java @@ -18,13 +18,12 @@ package org.apache.camel.component.kafka; import java.util.Properties; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,39 +33,38 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest { public static final String TOPIC = "test"; @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC - + "&zookeeperHost=localhost" - + "&zookeeperPort={{zookeeperPort}}" + "&groupId=group1" - + "&autoOffsetReset=smallest" + + "&autoOffsetReset=earliest" + "&autoCommitEnable=false" + "&batchSize=3" + "&consumerStreams=10" - // If set the consumerTiemout too small the test will fail in JDK7 - + "&consumerTimeoutMs=300" + "&barrierAwaitTimeoutMs=1000" - ) + ) private Endpoint from; @EndpointInject(uri = "mock:result") private MockEndpoint to; - private Producer<String, String> producer; + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; @Before public void before() { Properties props = new Properties(); - props.put("metadata.broker.list", "localhost:" + getKarfkaPort()); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner"); - props.put("request.required.acks", "1"); - ProducerConfig config = new ProducerConfig(props); - producer = new Producer<String, String>(config); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + + producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props); } @After public void after() { - producer.close(); + if (producer != null) { + producer.close(); + } } @Override @@ -74,47 +72,39 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest { return new RouteBuilder() { @Override public void configure() throws Exception { - from(from).autoStartup(true).to(to).setId("First"); + from(from).routeId("foo").to(to).setId("First"); } }; } @Test public void kafkaMessagesIsConsumedByCamel() throws Exception { + //First 2 must not be committed since batch size is 3 to.expectedBodiesReceivedInAnyOrder("m1", "m2"); for (int k = 1; k <= 2; k++) { String msg = "m" + k; - KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg); + ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg); producer.send(data); } - to.assertIsSatisfied(3000); - + to.assertIsSatisfied(); + to.reset(); + + to.expectedBodiesReceivedInAnyOrder("m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10"); + //Restart endpoint, - from.getCamelContext().stop(); - from.getCamelContext().start(); - - to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10"); + context.stopRoute("foo"); + context.startRoute("foo"); //Second route must wake up and consume all from scratch and commit 9 consumed for (int k = 3; k <= 10; k++) { String msg = "m" + k; - KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg); + ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg); producer.send(data); } - to.assertIsSatisfied(3000); - - to.reset(); - //Restart endpoint, - from.getCamelContext().stop(); - from.getCamelContext().start(); - - - //Only one message should left to consume by this consumer group - to.expectedMessageCount(1); - to.assertIsSatisfied(3000); + to.assertIsSatisfied(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index c49444b..4f8a0fd 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -19,46 +19,49 @@ package org.apache.camel.component.kafka; import java.io.IOException; import java.util.Properties; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { public static final String TOPIC = "test"; - @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}" - + "&groupId=group1&autoOffsetReset=smallest") + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + + "&groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true") private Endpoint from; @EndpointInject(uri = "mock:result") private MockEndpoint to; - private Producer<String, String> producer; + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; @Before public void before() { Properties props = new Properties(); - props.put("metadata.broker.list", "localhost:" + getKarfkaPort()); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner"); - props.put("request.required.acks", "1"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER); + props.put(ProducerConfig.ACKS_CONFIG, "1"); - ProducerConfig config = new ProducerConfig(props); - producer = new Producer<String, String>(config); + producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props); } @After public void after() { - producer.close(); + if (producer != null) { + producer.close(); + } } @Override @@ -78,10 +81,11 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { String msg = "message-" + k; - KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg); + ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, "1", msg); producer.send(data); } to.assertIsSatisfied(3000); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index b51c09e..86bc163 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -28,21 +28,21 @@ public class KafkaConsumerTest { private Processor processor = mock(Processor.class); @Test(expected = IllegalArgumentException.class) - public void consumerRequiresZookeeperConnect() throws Exception { + public void consumerRequiresBootstrapServers() throws Exception { Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); new KafkaConsumer(endpoint, processor); } @Test(expected = IllegalArgumentException.class) public void consumerRequiresGroupId() throws Exception { - Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot"); + Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234"); new KafkaConsumer(endpoint, processor); } @Test - public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception { + public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception { Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); - Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot"); + Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181"); new KafkaConsumer(endpoint, processor); } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java index be16766..bd25886 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java @@ -18,11 +18,8 @@ package org.apache.camel.component.kafka; import java.net.URISyntaxException; -import kafka.message.Message; -import kafka.message.MessageAndMetadata; - -import kafka.serializer.DefaultDecoder; import org.apache.camel.Exchange; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -35,12 +32,8 @@ public class KafkaEndpointTest { KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent()); endpoint.setBrokers("localhost"); - Message message = new Message("mymessage".getBytes(), "somekey".getBytes()); - DefaultDecoder decoder = new DefaultDecoder(null); - MessageAndMetadata<byte[], byte[]> mm = - new MessageAndMetadata<byte[], byte[]>("topic", 4, message, 56, decoder, decoder); - - Exchange exchange = endpoint.createKafkaExchange(mm); + ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 4, 56, "somekey", ""); + Exchange exchange = endpoint.createKafkaExchange(record); assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY)); assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC)); assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));