KYLIN-2131, fix KafkaClient depends on local config file
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3eba4d97 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3eba4d97 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3eba4d97 Branch: refs/heads/KYLIN-2131 Commit: 3eba4d978a246659954080bdc210d0a727ee4942 Parents: cf5d494 Author: Billy Liu <billy...@apache.org> Authored: Sat Dec 17 16:39:24 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Sun Dec 18 14:15:13 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/source/kafka/KafkaSource.java | 4 +--- .../org/apache/kylin/source/kafka/util/KafkaClient.java | 9 ++------- 2 files changed, 3 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3eba4d97/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 6c1ac1f..7a0363f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -35,7 +35,6 @@ import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,8 +93,7 @@ public class KafkaSource implements ISource { final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic(); - final Properties kafkaProperties = KafkaConsumerProperties.getInstanceFromEnv().getProperties(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), kafkaProperties)) { + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) { http://git-wip-us.apache.org/repos/asf/kylin/blob/3eba4d97/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 51339c7..69d7440 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -29,7 +29,6 @@ import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import java.util.Arrays; import java.util.List; @@ -40,8 +39,6 @@ import java.util.Properties; */ public class KafkaClient { - private static KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv(); - public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) { Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); @@ -106,8 +103,7 @@ public class KafkaClient { final String topic = kafakaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); - Properties kafkaProperties = kafkaFileConfig.getProperties(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) { + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { long latest = getLatestOffset(consumer, topic, partitionInfo.partition()); @@ -125,8 +121,7 @@ public class KafkaClient { final String topic = kafakaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); - Properties kafkaProperties = kafkaFileConfig.getProperties(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) { + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());