KYLIN-2131, fix passing kafka consumer properties to hadoop configuration
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b8a6118b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b8a6118b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b8a6118b Branch: refs/heads/KYLIN-2131 Commit: b8a6118b7cdd7a58a2ccbf3dcace31dd87095efe Parents: 656e908 Author: Billy Liu <billy...@apache.org> Authored: Fri Dec 16 19:01:20 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Sun Dec 18 14:15:13 2016 +0800 ---------------------------------------------------------------------- .../localmeta/kylin-kafka-consumer.properties | 19 ---- .../localmeta/kylin-kafka-consumer.xml | 27 ++++++ .../sandbox/kylin-kafka-consumer.properties | 19 ---- .../sandbox/kylin-kafka-consumer.xml | 27 ++++++ .../kafka/config/KafkaConsumerProperties.java | 94 ++++++++++++-------- .../source/kafka/hadoop/KafkaFlatTableJob.java | 5 +- .../source/kafka/hadoop/KafkaInputFormat.java | 6 +- .../kafka/hadoop/KafkaInputRecordReader.java | 6 +- .../config/KafkaConsumerPropertiesTest.java | 26 +++++- 9 files changed, 142 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties deleted file mode 100644 index d198f4e..0000000 --- a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs -session.timeout.ms=30000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.xml b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml new file mode 100644 index 0000000..6da7004 --- /dev/null +++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +--> +<configuration> + <property> + <name>session.timeout.ms</name> + <value>30000</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties deleted file mode 100644 index d198f4e..0000000 --- a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs -session.timeout.ms=30000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.xml b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml new file mode 100644 index 0000000..6da7004 --- /dev/null +++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +--> +<configuration> + <property> + <name>session.timeout.ms</name> + <value>30000</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index 7a15e63..29589d5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -18,37 +18,40 @@ package org.apache.kylin.source.kafka.config; -import org.apache.commons.io.FileUtils; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigCannotInitException; +import org.apache.kylin.common.util.OptionsHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - public class KafkaConsumerProperties { + public static final String KAFKA_CONSUMER_FILE = "kylin-kafka-consumer.xml"; private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class); - + // static cached instances + private static KafkaConsumerProperties ENV_INSTANCE = null; private volatile Properties properties = new Properties(); - public static final String KAFKA_CONSUMER_PROPERTIES_FILE = "kylin-kafka-consumer.properties"; + private KafkaConsumerProperties() { - // static cached instances - private static KafkaConsumerProperties ENV_INSTANCE = null; + } public static KafkaConsumerProperties getInstanceFromEnv() { synchronized (KafkaConsumerProperties.class) { if (ENV_INSTANCE == null) { try { KafkaConsumerProperties config = new KafkaConsumerProperties(); - config.properties = loadKafkaConsumerProperties(); + config.properties = config.loadKafkaConsumerProperties(); logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config)); ENV_INSTANCE = config; @@ -60,50 +63,69 @@ public class KafkaConsumerProperties { } } - public Properties getProperties(){ - Properties result = new Properties(); - result.putAll(properties); - return result; + private static File getKafkaConsumerFile(String path) { + if (path == null) { + return null; + } + + return new File(path, KAFKA_CONSUMER_FILE); } - public InputStream getHadoopJobConfInputStream() throws IOException { - File kafkaProperties = getKafkaConsumerPropertiesFile(); - return FileUtils.openInputStream(kafkaProperties); + public static Properties getProperties(Configuration configuration) { + Properties result = new Properties(); + for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) { + Map.Entry<String, String> entry = it.next(); + String key = entry.getKey(); + String value = entry.getValue(); + result.put(key, value); + } + // TODO: Not filter non-kafka properties, no issue, but some annoying logs + // Tried to leverage Kafka API to find non used properties, but the API is + // not open to public + return result; } - private static Properties loadKafkaConsumerProperties() { - File propFile = getKafkaConsumerPropertiesFile(); + private Properties loadKafkaConsumerProperties() { + File propFile = getKafkaConsumerFile(); if (propFile == null || !propFile.exists()) { - logger.error("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE); - throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE); + logger.error("fail to locate " + KAFKA_CONSUMER_FILE); + throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_FILE); } - Properties conf = new Properties(); + Properties properties = new Properties(); try { FileInputStream is = new FileInputStream(propFile); - conf.load(is); + Configuration conf = new Configuration(); + conf.addResource(is); + properties.putAll(getProperties(conf)); IOUtils.closeQuietly(is); File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); if (propOverrideFile.exists()) { FileInputStream ois = new FileInputStream(propOverrideFile); Properties propOverride = new Properties(); - propOverride.load(ois); + Configuration oconf = new Configuration(); + oconf.addResource(ois); + properties.putAll(getProperties(oconf)); IOUtils.closeQuietly(ois); - conf.putAll(propOverride); } } catch (IOException e) { throw new RuntimeException(e); } - return conf; + return properties; + } + + public String getKafkaConsumerHadoopJobConf(){ + File kafkaConsumerFile = getKafkaConsumerFile(); + return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath()); } - private static File getKafkaConsumerPropertiesFile() { + private File getKafkaConsumerFile() { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF); if (!StringUtils.isEmpty(kylinConfHome)) { logger.info("Use KYLIN_CONF=" + kylinConfHome); - return getKafkaConsumerPropertiesFile(kylinConfHome); + return getKafkaConsumerFile(kylinConfHome); } logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable"); @@ -113,14 +135,12 @@ public class KafkaConsumerProperties { throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them"); String path = kylinHome + File.separator + "conf"; - return getKafkaConsumerPropertiesFile(path); + return getKafkaConsumerFile(path); } - private static File getKafkaConsumerPropertiesFile(String path) { - if (path == null) { - return null; - } - - return new File(path, KAFKA_CONSUMER_PROPERTIES_FILE); + public Properties getProperties() { + Properties prop = new Properties(); + prop.putAll(this.properties); + return prop; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index e030719..40f12ee 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -61,7 +61,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; - public static final String CONFIG_KAFKA_CONSUMER_PROPERTIES = "kafka.consumer.properties"; @Override public int run(String[] args) throws Exception { @@ -102,8 +101,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); - KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv(); - job.getConfiguration().addResource(kafkaFileConfig.getHadoopJobConfInputStream(), CONFIG_KAFKA_CONSUMER_PROPERTIES); + KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); + job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index 0aab72e..96bac3f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -19,7 +19,6 @@ package org.apache.kylin.source.kafka.hadoop; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import com.google.common.base.Preconditions; @@ -67,9 +67,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } } - InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES); - Properties kafkaProperties = new Properties(); - kafkaProperties.load(inputStream); + Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf); final List<InputSplit> splits = new ArrayList<InputSplit>(); try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index dfd0e59..e8bd76c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -19,7 +19,6 @@ package org.apache.kylin.source.kafka.hadoop; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; @@ -35,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,9 +87,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit } String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES); - Properties kafkaProperties = new Properties(); - kafkaProperties.load(inputStream); + Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf); consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties); http://git-wip-us.apache.org/repos/asf/kylin/blob/b8a6118b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java index 3690439..378ec73 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java @@ -4,10 +4,24 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.util.Arrays; +import java.util.Properties; + +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.xml.sax.SAXException; public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { @Before @@ -21,11 +35,21 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { } @Test - public void testLoadKafkaConfig() { + public void testLoadKafkaProperties() { KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks")); assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms")); assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms")); } + @Test + public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException { + KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); + Configuration conf = new Configuration(false); + conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE); + assertEquals("30000", conf.get("session.timeout.ms")); + + Properties prop = KafkaConsumerProperties.getProperties(conf); + assertEquals("30000", prop.getProperty("session.timeout.ms")); + } }