Repository: camel Updated Branches: refs/heads/master 27664e8f6 -> 3938344aa
Upgrade Kafka and related bundle to version 0.10.0.0 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3938344a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3938344a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3938344a Branch: refs/heads/master Commit: 3938344aa682e8ba2f1766e272b8feb6bf91d682 Parents: 27664e8 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Jun 8 11:48:54 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Jun 8 11:48:54 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 4 +++- .../camel/component/kafka/embedded/EmbeddedKafkaCluster.java | 3 ++- parent/pom.xml | 4 ++-- 3 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 8649a46..a317b54 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,6 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.slf4j.Logger; @@ -113,7 +115,7 @@ public class KafkaConsumer extends DefaultConsumer { LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName); // This poll to ensures we have an assigned partition otherwise seek won't work consumer.poll(100); - consumer.seekToBeginning(); + consumer.seekToBeginning(consumer.assignment()); } while (isRunAllowed() && !isSuspendingOrSuspended()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java index 42403c2..69de777 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Properties; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.ZkUtils; @@ -66,7 +67,7 @@ public class EmbeddedKafkaCluster { public void createTopics(String... topics) { for (String topic : topics) { - AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties()); + AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); } } http://git-wip-us.apache.org/repos/asf/camel/blob/3938344a/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 286d537..7224373 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -352,8 +352,8 @@ <jython-version>2.5.3</jython-version> <jzlib-version>1.1.3</jzlib-version> <jzlib-bundle-version>1.1.3_2</jzlib-bundle-version> - <kafka-version>0.9.0.1</kafka-version> - <kafka-bundle-version>0.9.0.1_1</kafka-bundle-version> + <kafka-version>0.10.0.0</kafka-version> + <kafka-bundle-version>0.10.0.0_1</kafka-bundle-version> <karaf-version>2.4.4</karaf-version> <karaf3-version>3.0.6</karaf3-version> <karaf4-version>4.0.5</karaf4-version>