KYLIN-2700 Should allow user to override Kafka conf in cube level Signed-off-by: Billy Liu <billy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/40e4d627 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/40e4d627 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/40e4d627 Branch: refs/heads/KYLIN-2606 Commit: 40e4d627227f584d6212ef588eb3fd658bf159dc Parents: 1eb32c3 Author: qiumingming <qiumingm...@bytedance.com> Authored: Fri Jun 30 18:31:52 2017 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Fri Jun 30 18:40:10 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/40e4d627/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index 11466e5..e98b784 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -104,7 +104,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())); - appendKafkaOverrideProperties(KylinConfig.getInstanceFromEnv(), job.getConfiguration()); + appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration()); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));