Repository: kylin Updated Branches: refs/heads/master 78a591798 -> 642981746
KYLIN-2296 support cube level kafka config overwrite Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/64298174 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/64298174 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/64298174 Branch: refs/heads/master Commit: 642981746b0db15a66a854fdd45876bf685667b5 Parents: 78a5917 Author: Billy Liu <billy...@apache.org> Authored: Thu Dec 22 15:02:01 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Thu Dec 22 15:02:01 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 8 +++++++ .../kylin/rest/controller/CubeController.java | 2 +- .../apache/kylin/source/kafka/KafkaSource.java | 8 +++---- .../kafka/config/KafkaConsumerProperties.java | 8 +++---- .../source/kafka/hadoop/KafkaFlatTableJob.java | 25 ++++++++++++++------ .../source/kafka/hadoop/KafkaInputFormat.java | 2 +- .../kafka/hadoop/KafkaInputRecordReader.java | 2 +- .../kylin/source/kafka/util/KafkaClient.java | 14 +++++------ .../config/KafkaConsumerPropertiesTest.java | 8 +++---- 9 files changed, 48 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 11e48ac..168e5b5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -497,6 +497,14 @@ abstract public class KylinConfigBase implements Serializable { } // ============================================================================ + // SOURCE.KAFKA + // ============================================================================ + + public Map<String, String> getKafkaConfigOverride() { + return getPropertiesByPrefix("kylin.source.kafka.config-override."); + } + + // ============================================================================ // STORAGE.HBASE // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 6e3668b..978f477 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -650,7 +650,7 @@ public class CubeController extends BasicController { final GeneralResponse response = new GeneralResponse(); try { - final Map<Integer, Long> startOffsets = KafkaClient.getCurrentOffsets(cubeInstance); + final Map<Integer, Long> startOffsets = KafkaClient.getLatestOffsets(cubeInstance); CubeDesc desc = cubeInstance.getDescriptor(); desc.setPartitionOffsetStart(startOffsets); cubeService.getCubeDescManager().updateCubeDesc(desc); http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 12f3da2..6689c6e 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -89,9 +89,9 @@ public class KafkaSource implements ISource { } } - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable()); - final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); - final String topic = kafakaConfig.getTopic(); + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable()); + final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); + final String topic = kafkaConfig.getTopic(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { @@ -107,7 +107,7 @@ public class KafkaSource implements ISource { if (result.getEndOffset() == Long.MAX_VALUE) { logger.debug("Seek end offsets from topic"); - Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube); + Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cube); logger.debug("The end offsets are " + latestOffsets); for (Integer partitionId : latestOffsets.keySet()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index 5bc1a82..8f0dd42 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -75,7 +75,7 @@ public class KafkaConsumerProperties { return new File(path, KAFKA_CONSUMER_FILE); } - public static Properties getProperties(Configuration configuration) { + public static Properties extractKafkaConfigToProperties(Configuration configuration) { Set<String> configNames = new HashSet<String>(); try { configNames = ConsumerConfig.configNames(); @@ -109,7 +109,7 @@ public class KafkaConsumerProperties { FileInputStream is = new FileInputStream(propFile); Configuration conf = new Configuration(); conf.addResource(is); - properties.putAll(getProperties(conf)); + properties.putAll(extractKafkaConfigToProperties(conf)); IOUtils.closeQuietly(is); File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); @@ -118,7 +118,7 @@ public class KafkaConsumerProperties { Properties propOverride = new Properties(); Configuration oconf = new Configuration(); oconf.addResource(ois); - properties.putAll(getProperties(oconf)); + properties.putAll(extractKafkaConfigToProperties(oconf)); IOUtils.closeQuietly(ois); } } catch (IOException e) { @@ -151,7 +151,7 @@ public class KafkaConsumerProperties { return getKafkaConsumerFile(path); } - public Properties getProperties() { + public Properties extractKafkaConfigToProperties() { Properties prop = new Properties(); prop.putAll(this.properties); return prop; http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index 40f12ee..f0f48c0 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -18,10 +18,12 @@ package org.apache.kylin.source.kafka.hadoop; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; -import org.apache.kylin.source.kafka.util.KafkaClient; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -34,15 +36,14 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; +import org.apache.kylin.source.kafka.util.KafkaClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; - /** * Run a Hadoop Job to process the stream data in kafka; * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader @@ -103,6 +104,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())); + appendKafkaOverrideProperties(KylinConfig.getInstanceFromEnv(), job.getConfiguration()); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); @@ -157,6 +159,15 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.setNumReduceTasks(0); } + private static void appendKafkaOverrideProperties(final KylinConfig kylinConfig, Configuration conf) { + final Map<String, String> kafkaConfOverride = kylinConfig.getKafkaConfigOverride(); + if (kafkaConfOverride.isEmpty() == false) { + for (String key : kafkaConfOverride.keySet()) { + conf.set(key, kafkaConfOverride.get(key), "kafka"); + } + } + } + public static void main(String[] args) throws Exception { KafkaFlatTableJob job = new KafkaFlatTableJob(); int exitCode = ToolRunner.run(job, args); http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index 96bac3f..c996c5f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -67,7 +67,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } } - Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf); + Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf); final List<InputSplit> splits = new ArrayList<InputSplit>(); try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index e8bd76c..c22c72f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -87,7 +87,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit } String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf); + Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf); consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties); http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 69d7440..bd8f90e 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -96,11 +96,11 @@ public class KafkaClient { return consumer.position(topicPartition); } - public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstance) { - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable()); + public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) { + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable()); - final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); - final String topic = kafakaConfig.getTopic(); + final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); + final String topic = kafkaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { @@ -115,10 +115,10 @@ public class KafkaClient { public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) { - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable()); + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable()); - final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); - final String topic = kafakaConfig.getTopic(); + final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); + final String topic = kafkaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/64298174/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java index 8edb84d..1d7863b 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java @@ -32,9 +32,9 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { @Test public void testLoadKafkaProperties() { KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); - assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks")); - assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms")); - assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms")); + assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks")); + assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms")); + assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms")); } @Test @@ -44,7 +44,7 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE); assertEquals("30000", conf.get("session.timeout.ms")); - Properties prop = KafkaConsumerProperties.getProperties(conf); + Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf); assertEquals("30000", prop.getProperty("session.timeout.ms")); } }