Repository: kylin
Updated Branches:
  refs/heads/master 78a591798 -> 642981746


KYLIN-2296 support cube level kafka config overwrite


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/64298174
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/64298174
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/64298174

Branch: refs/heads/master
Commit: 642981746b0db15a66a854fdd45876bf685667b5
Parents: 78a5917
Author: Billy Liu <billy...@apache.org>
Authored: Thu Dec 22 15:02:01 2016 +0800
Committer: Billy Liu <billy...@apache.org>
Committed: Thu Dec 22 15:02:01 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  8 +++++++
 .../kylin/rest/controller/CubeController.java   |  2 +-
 .../apache/kylin/source/kafka/KafkaSource.java  |  8 +++----
 .../kafka/config/KafkaConsumerProperties.java   |  8 +++----
 .../source/kafka/hadoop/KafkaFlatTableJob.java  | 25 ++++++++++++++------
 .../source/kafka/hadoop/KafkaInputFormat.java   |  2 +-
 .../kafka/hadoop/KafkaInputRecordReader.java    |  2 +-
 .../kylin/source/kafka/util/KafkaClient.java    | 14 +++++------
 .../config/KafkaConsumerPropertiesTest.java     |  8 +++----
 9 files changed, 48 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 11e48ac..168e5b5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -497,6 +497,14 @@ abstract public class KylinConfigBase implements 
Serializable {
     }
 
     // 
============================================================================
+    // SOURCE.KAFKA
+    // 
============================================================================
+
+    public Map<String, String> getKafkaConfigOverride() {
+        return getPropertiesByPrefix("kylin.source.kafka.config-override.");
+    }
+
+    // 
============================================================================
     // STORAGE.HBASE
     // 
============================================================================
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 6e3668b..978f477 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -650,7 +650,7 @@ public class CubeController extends BasicController {
 
         final GeneralResponse response = new GeneralResponse();
         try {
-            final Map<Integer, Long> startOffsets = 
KafkaClient.getCurrentOffsets(cubeInstance);
+            final Map<Integer, Long> startOffsets = 
KafkaClient.getLatestOffsets(cubeInstance);
             CubeDesc desc = cubeInstance.getDescriptor();
             desc.setPartitionOffsetStart(startOffsets);
             cubeService.getCubeDescManager().updateCubeDesc(desc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 12f3da2..6689c6e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -89,9 +89,9 @@ public class KafkaSource implements ISource {
             }
         }
 
-        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
+        final KafkaConfig kafkaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable());
+        final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+        final String topic = kafkaConfig.getTopic();
         try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
             final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
@@ -107,7 +107,7 @@ public class KafkaSource implements ISource {
 
         if (result.getEndOffset() == Long.MAX_VALUE) {
             logger.debug("Seek end offsets from topic");
-            Map<Integer, Long> latestOffsets = 
KafkaClient.getCurrentOffsets(cube);
+            Map<Integer, Long> latestOffsets = 
KafkaClient.getLatestOffsets(cube);
             logger.debug("The end offsets are " + latestOffsets);
 
             for (Integer partitionId : latestOffsets.keySet()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 5bc1a82..8f0dd42 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -75,7 +75,7 @@ public class KafkaConsumerProperties {
         return new File(path, KAFKA_CONSUMER_FILE);
     }
 
-    public static Properties getProperties(Configuration configuration) {
+    public static Properties extractKafkaConfigToProperties(Configuration 
configuration) {
         Set<String> configNames = new HashSet<String>();
         try {
             configNames = ConsumerConfig.configNames();
@@ -109,7 +109,7 @@ public class KafkaConsumerProperties {
             FileInputStream is = new FileInputStream(propFile);
             Configuration conf = new Configuration();
             conf.addResource(is);
-            properties.putAll(getProperties(conf));
+            properties.putAll(extractKafkaConfigToProperties(conf));
             IOUtils.closeQuietly(is);
 
             File propOverrideFile = new File(propFile.getParentFile(), 
propFile.getName() + ".override");
@@ -118,7 +118,7 @@ public class KafkaConsumerProperties {
                 Properties propOverride = new Properties();
                 Configuration oconf = new Configuration();
                 oconf.addResource(ois);
-                properties.putAll(getProperties(oconf));
+                properties.putAll(extractKafkaConfigToProperties(oconf));
                 IOUtils.closeQuietly(ois);
             }
         } catch (IOException e) {
@@ -151,7 +151,7 @@ public class KafkaConsumerProperties {
         return getKafkaConsumerFile(path);
     }
 
-    public Properties getProperties() {
+    public Properties extractKafkaConfigToProperties() {
         Properties prop = new Properties();
         prop.putAll(this.properties);
         return prop;

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 40f12ee..f0f48c0 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.source.kafka.hadoop;
 
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
-import org.apache.kylin.source.kafka.util.KafkaClient;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -34,15 +36,14 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
+import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
 /**
  * Run a Hadoop Job to process the stream data in kafka;
  * Modified from the kafka-hadoop-loader in 
https://github.com/amient/kafka-hadoop-loader
@@ -103,6 +104,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().addResource(new 
Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
             KafkaConsumerProperties kafkaConsumerProperties = 
KafkaConsumerProperties.getInstanceFromEnv();
             job.getConfiguration().addResource(new 
Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
+            appendKafkaOverrideProperties(KylinConfig.getInstanceFromEnv(), 
job.getConfiguration());
             job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
             job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, 
String.valueOf(kafkaConfig.getTimeout()));
@@ -157,6 +159,15 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
         job.setNumReduceTasks(0);
     }
 
+    private static void appendKafkaOverrideProperties(final KylinConfig 
kylinConfig, Configuration conf) {
+        final Map<String, String> kafkaConfOverride = 
kylinConfig.getKafkaConfigOverride();
+        if (kafkaConfOverride.isEmpty() == false) {
+            for (String key : kafkaConfOverride.keySet()) {
+                conf.set(key, kafkaConfOverride.get(key), "kafka");
+            }
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         KafkaFlatTableJob job = new KafkaFlatTableJob();
         int exitCode = ToolRunner.run(job, args);

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 96bac3f..c996c5f 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -67,7 +67,7 @@ public class KafkaInputFormat extends 
InputFormat<LongWritable, BytesWritable> {
             }
         }
 
-        Properties kafkaProperties = 
KafkaConsumerProperties.getProperties(conf);
+        Properties kafkaProperties = 
KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
         try (KafkaConsumer<String, String> consumer = 
KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(inputTopic);

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index e8bd76c..c22c72f 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -87,7 +87,7 @@ public class KafkaInputRecordReader extends 
RecordReader<LongWritable, BytesWrit
         }
         String consumerGroup = 
conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
 
-        Properties kafkaProperties = 
KafkaConsumerProperties.getProperties(conf);
+        Properties kafkaProperties = 
KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
 
         consumer = 
org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, 
consumerGroup, kafkaProperties);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 69d7440..bd8f90e 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -96,11 +96,11 @@ public class KafkaClient {
         return consumer.position(topicPartition);
     }
 
-    public static Map<Integer, Long> getCurrentOffsets(final CubeInstance 
cubeInstance) {
-        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
+    public static Map<Integer, Long> getLatestOffsets(final CubeInstance 
cubeInstance) {
+        final KafkaConfig kafkaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
 
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
+        final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+        final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
         try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
@@ -115,10 +115,10 @@ public class KafkaClient {
 
 
     public static Map<Integer, Long> getEarliestOffsets(final CubeInstance 
cubeInstance) {
-        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
+        final KafkaConfig kafkaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
 
-        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
-        final String topic = kafakaConfig.getTopic();
+        final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
+        final String topic = kafkaConfig.getTopic();
 
         Map<Integer, Long> startOffsets = Maps.newHashMap();
         try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 8edb84d..1d7863b 100644
--- 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -32,9 +32,9 @@ public class KafkaConsumerPropertiesTest extends 
LocalFileMetadataTestCase {
     @Test
     public void testLoadKafkaProperties() {
         KafkaConsumerProperties kafkaConsumerProperties = 
KafkaConsumerProperties.getInstanceFromEnv();
-        
assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks"));
-        
assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms"));
-        assertEquals("30000", 
kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms"));
+        
assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks"));
+        
assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms"));
+        assertEquals("30000", 
kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms"));
     }
 
     @Test
@@ -44,7 +44,7 @@ public class KafkaConsumerPropertiesTest extends 
LocalFileMetadataTestCase {
         conf.addResource(new FileInputStream(new 
File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), 
KafkaConsumerProperties.KAFKA_CONSUMER_FILE);
         assertEquals("30000", conf.get("session.timeout.ms"));
 
-        Properties prop = KafkaConsumerProperties.getProperties(conf);
+        Properties prop = 
KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         assertEquals("30000", prop.getProperty("session.timeout.ms"));
     }
 }

Reply via email to