This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 2.6.x in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x by this push: new f2eccab add overwrite method getKafkaConsumer in KafkaClient (#658) f2eccab is described below commit f2eccabbdf780345ad39e17bbfbd146f005a8c48 Author: liuzx32 <liuz...@163.com> AuthorDate: Tue Jun 11 15:45:10 2019 +0800 add overwrite method getKafkaConsumer in KafkaClient (#658) * add overwrite method getKafkaConsumer in KafkaClient --- .../java/org/apache/kylin/source/kafka/util/KafkaClient.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index a0cb59a..a781f8a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -29,6 +29,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import java.util.Arrays; import java.util.List; @@ -43,6 +44,11 @@ public class KafkaClient { throw new IllegalStateException("Class KafkaClient is an utility class !"); } + public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup) { + Properties properties = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties(); + return getKafkaConsumer(brokers, consumerGroup, properties); + } + public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) { Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); @@ -107,7 +113,7 @@ public class KafkaClient { final String topic = kafkaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { long latest = getLatestOffset(consumer, topic, partitionInfo.partition()); @@ -125,7 +131,7 @@ public class KafkaClient { final String topic = kafkaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());