This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new f2eccab  add overwrite method getKafkaConsumer in KafkaClient (#658)
f2eccab is described below

commit f2eccabbdf780345ad39e17bbfbd146f005a8c48
Author: liuzx32 <liuz...@163.com>
AuthorDate: Tue Jun 11 15:45:10 2019 +0800

    add overwrite method getKafkaConsumer in KafkaClient (#658)
    
    * add overwrite method getKafkaConsumer in KafkaClient
---
 .../java/org/apache/kylin/source/kafka/util/KafkaClient.java   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a0cb59a..a781f8a 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -29,6 +29,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
 
 import java.util.Arrays;
 import java.util.List;
@@ -43,6 +44,11 @@ public class KafkaClient {
         throw new IllegalStateException("Class KafkaClient is an utility class 
!");
     }
 
+    public static KafkaConsumer getKafkaConsumer(String brokers, String 
consumerGroup) {
+        Properties properties = 
KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
+        return getKafkaConsumer(brokers, consumerGroup, properties);
+    }
+
     public static KafkaConsumer getKafkaConsumer(String brokers, String 
consumerGroup, Properties properties) {
         Properties props = constructDefaultKafkaConsumerProperties(brokers, 
consumerGroup, properties);
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
@@ -107,7 +113,7 @@ public class KafkaClient {
         final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
             final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getLatestOffset(consumer, topic, 
partitionInfo.partition());
@@ -125,7 +131,7 @@ public class KafkaClient {
         final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
-        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName())) {
             final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
                 long latest = getEarliestOffset(consumer, topic, 
partitionInfo.partition());

Reply via email to