Repository: kylin Updated Branches: refs/heads/master 8cb8f279b -> 9dbc24bf3
KYLIN-2131 Load Kafka Consumer Configuration from kylin-kafka-consumer.xml Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9dbc24bf Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9dbc24bf Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9dbc24bf Branch: refs/heads/master Commit: 9dbc24bf320ff761ae83028f868feb4a4424f083 Parents: 8cb8f27 Author: Billy Liu <billy...@apache.org> Authored: Wed Dec 21 17:44:53 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Wed Dec 21 17:44:53 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/Kafka10DataLoader.java | 35 +++- build/conf/kylin-kafka-consumer.xml | 27 ++++ .../localmeta/kylin-kafka-consumer.xml | 27 ++++ .../sandbox/kylin-kafka-consumer.xml | 27 ++++ pom.xml | 2 +- .../apache/kylin/source/kafka/KafkaSource.java | 4 +- .../source/kafka/config/KafkaClusterConfig.java | 4 - .../kylin/source/kafka/config/KafkaConfig.java | 11 -- .../kafka/config/KafkaConsumerProperties.java | 159 +++++++++++++++++++ .../source/kafka/hadoop/KafkaFlatTableJob.java | 6 +- .../source/kafka/hadoop/KafkaInputFormat.java | 5 +- .../kafka/hadoop/KafkaInputRecordReader.java | 16 +- .../kylin/source/kafka/util/KafkaClient.java | 34 +--- .../config/KafkaConsumerPropertiesTest.java | 50 ++++++ webapp/app/js/model/streamingModel.js | 2 - .../partials/cubeDesigner/streamingConfig.html | 33 +--- 16 files changed, 344 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java index 8c548be..fae81ce 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java @@ -19,6 +19,7 @@ package org.apache.kylin.job.streaming; import java.util.List; +import java.util.Map; import java.util.Properties; import javax.annotation.Nullable; @@ -26,6 +27,7 @@ import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -35,8 +37,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.Collections2; -import org.apache.kylin.source.kafka.util.KafkaClient; - /** * Load prepared data into kafka(for test use) */ @@ -60,10 +60,7 @@ public class Kafka10DataLoader extends StreamDataLoader { } }), ","); - Properties props = new Properties(); - props.put("acks", "1"); - props.put("retry.backoff.ms", "1000"); - KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props); + KafkaProducer producer = getKafkaProducer(brokerList, null); for (int i = 0; i < messages.size(); i++) { ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); @@ -73,4 +70,30 @@ public class Kafka10DataLoader extends StreamDataLoader { producer.close(); } + public static KafkaProducer getKafkaProducer(String brokers, Properties properties) { + Properties props = constructDefaultKafkaProducerProperties(brokers, properties); + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); + return producer; + } + + private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) { + Properties props = new Properties(); + props.put("retry.backoff.ms", "1000"); + props.put("bootstrap.servers", brokers); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + props.put("acks", "1"); + props.put("buffer.memory", 33554432); + props.put("retries", 0); + props.put("batch.size", 16384); + props.put("linger.ms", 50); + props.put("request.timeout.ms", "30000"); + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + return props; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/build/conf/kylin-kafka-consumer.xml ---------------------------------------------------------------------- diff --git a/build/conf/kylin-kafka-consumer.xml b/build/conf/kylin-kafka-consumer.xml new file mode 100644 index 0000000..6da7004 --- /dev/null +++ b/build/conf/kylin-kafka-consumer.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +--> +<configuration> + <property> + <name>session.timeout.ms</name> + <value>30000</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/examples/test_case_data/localmeta/kylin-kafka-consumer.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.xml b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml new file mode 100644 index 0000000..6da7004 --- /dev/null +++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +--> +<configuration> + <property> + <name>session.timeout.ms</name> + <value>30000</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/examples/test_case_data/sandbox/kylin-kafka-consumer.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.xml b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml new file mode 100644 index 0000000..6da7004 --- /dev/null +++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +--> +<configuration> + <property> + <name>session.timeout.ms</name> + <value>30000</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 51479c8..c837b10 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ <!-- HBase versions --> <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version> - <kafka.version>0.10.0.0</kafka.version> + <kafka.version>0.10.1.0</kafka.version> <!-- Hadoop deps, keep compatible with hadoop2.version --> <zookeeper.version>3.4.6</zookeeper.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index e469f77..12f3da2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -83,8 +83,8 @@ public class KafkaSource implements ISource { logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart()); result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); } else { - // from the topic's very begining; - logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning."); + // from the topic's earliest offset; + logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset."); result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java index 95349c2..afe888f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java @@ -47,10 +47,6 @@ public class KafkaClusterConfig extends RootPersistentEntity { @JsonBackReference private KafkaConfig kafkaConfig; - public int getBufferSize() { - return kafkaConfig.getBufferSize(); - } - public String getTopic() { return kafkaConfig.getTopic(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 157d83c..a096344 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -55,9 +55,6 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("timeout") private int timeout; - @JsonProperty("bufferSize") - private int bufferSize; - @JsonProperty("parserName") private String parserName; @@ -97,14 +94,6 @@ public class KafkaConfig extends RootPersistentEntity { this.timeout = timeout; } - public int getBufferSize() { - return bufferSize; - } - - public void setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - } - public String getTopic() { return topic; } http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java new file mode 100644 index 0000000..5bc1a82 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.kafka.config; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigCannotInitException; +import org.apache.kylin.common.util.OptionsHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerProperties { + + public static final String KAFKA_CONSUMER_FILE = "kylin-kafka-consumer.xml"; + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class); + // static cached instances + private static KafkaConsumerProperties ENV_INSTANCE = null; + private volatile Properties properties = new Properties(); + + private KafkaConsumerProperties() { + + } + + public static KafkaConsumerProperties getInstanceFromEnv() { + synchronized (KafkaConsumerProperties.class) { + if (ENV_INSTANCE == null) { + try { + KafkaConsumerProperties config = new KafkaConsumerProperties(); + config.properties = config.loadKafkaConsumerProperties(); + + logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config)); + ENV_INSTANCE = config; + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e); + } + } + return ENV_INSTANCE; + } + } + + private static File getKafkaConsumerFile(String path) { + if (path == null) { + return null; + } + + return new File(path, KAFKA_CONSUMER_FILE); + } + + public static Properties getProperties(Configuration configuration) { + Set<String> configNames = new HashSet<String>(); + try { + configNames = ConsumerConfig.configNames(); + } catch (Exception e) { + // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException + String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms," + + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(","); + configNames.addAll(Arrays.asList(configNamesArray)); + } + + Properties result = new Properties(); + for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) { + Map.Entry<String, String> entry = it.next(); + String key = entry.getKey(); + String value = entry.getValue(); + if (configNames.contains(key)) { + result.put(key, value); + } + } + return result; + } + + private Properties loadKafkaConsumerProperties() { + File propFile = getKafkaConsumerFile(); + if (propFile == null || !propFile.exists()) { + logger.error("fail to locate " + KAFKA_CONSUMER_FILE); + throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_FILE); + } + Properties properties = new Properties(); + try { + FileInputStream is = new FileInputStream(propFile); + Configuration conf = new Configuration(); + conf.addResource(is); + properties.putAll(getProperties(conf)); + IOUtils.closeQuietly(is); + + File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); + if (propOverrideFile.exists()) { + FileInputStream ois = new FileInputStream(propOverrideFile); + Properties propOverride = new Properties(); + Configuration oconf = new Configuration(); + oconf.addResource(ois); + properties.putAll(getProperties(oconf)); + IOUtils.closeQuietly(ois); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return properties; + } + + public String getKafkaConsumerHadoopJobConf() { + File kafkaConsumerFile = getKafkaConsumerFile(); + return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath()); + } + + private File getKafkaConsumerFile() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF); + if (!StringUtils.isEmpty(kylinConfHome)) { + logger.info("Use KYLIN_CONF=" + kylinConfHome); + return getKafkaConsumerFile(kylinConfHome); + } + + logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable"); + + String kylinHome = kylinConfig.getKylinHome(); + if (StringUtils.isEmpty(kylinHome)) + throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them"); + + String path = kylinHome + File.separator + "conf"; + return getKafkaConsumerFile(path); + } + + public Properties getProperties() { + Properties prop = new Properties(); + prop.putAll(this.properties); + return prop; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index f4d54c5..40f12ee 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -19,6 +19,7 @@ package org.apache.kylin.source.kafka.hadoop; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; @@ -57,10 +58,10 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers"; public static final String CONFIG_KAFKA_TOPIC = "kafka.topic"; public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout"; - public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size"; public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; + @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -100,10 +101,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); + KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); + job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); - job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index fe0e2cc..96bac3f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import com.google.common.base.Preconditions; @@ -65,8 +67,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } } + Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf); final List<InputSplit> splits = new ArrayList<InputSplit>(); - try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) { + try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side"); for (int i = 0; i < partitionInfos.size(); i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index 6774c9d..e8bd76c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.hadoop; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -33,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,8 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class); + public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000; + private Configuration conf; private KafkaInputSplit split; @@ -61,8 +65,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit private LongWritable key; private BytesWritable value; - private long timeOut = 60000; - private long bufferSize = 65536; + private long timeOut = DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT; private long numProcessedMessages = 0L; @@ -82,12 +85,11 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) { timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT)); } - if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) { - bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE)); - } - String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null); + + Properties kafkaProperties = KafkaConsumerProperties.getProperties(conf); + + consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties); earliestOffset = this.split.getOffsetStart(); latestOffset = this.split.getOffsetEnd(); http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 3b970b3..69d7440 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -20,9 +20,9 @@ package org.apache.kylin.source.kafka.util; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.source.kafka.KafkaConfigManager; @@ -45,43 +45,17 @@ public class KafkaClient { return consumer; } - public static KafkaProducer getKafkaProducer(String brokers, Properties properties) { - Properties props = constructDefaultKafkaProducerProperties(brokers, properties); - KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); - return producer; - } - - private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) { + private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { Properties props = new Properties(); - props.put("bootstrap.servers", brokers); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("acks", "1"); - props.put("buffer.memory", 33554432); - props.put("retries", 0); - props.put("batch.size", 16384); - props.put("linger.ms", 50); - props.put("request.timeout.ms", "30000"); if (properties != null) { for (Map.Entry entry : properties.entrySet()) { props.put(entry.getKey(), entry.getValue()); } } - return props; - } - - private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { - Properties props = new Properties(); props.put("bootstrap.servers", brokers); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); props.put("group.id", consumerGroup); - props.put("session.timeout.ms", "30000"); - if (properties != null) { - for (Map.Entry entry : properties.entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - } props.put("enable.auto.commit", "false"); return props; } http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java new file mode 100644 index 0000000..8edb84d --- /dev/null +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java @@ -0,0 +1,50 @@ +package org.apache.kylin.source.kafka.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.xml.sax.SAXException; + +public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testLoadKafkaProperties() { + KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); + assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks")); + assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms")); + assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms")); + } + + @Test + public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException { + KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); + Configuration conf = new Configuration(false); + conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE); + assertEquals("30000", conf.get("session.timeout.ms")); + + Properties prop = KafkaConsumerProperties.getProperties(conf); + assertEquals("30000", prop.getProperty("session.timeout.ms")); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/webapp/app/js/model/streamingModel.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js index 0321806..70f27aa 100644 --- a/webapp/app/js/model/streamingModel.js +++ b/webapp/app/js/model/streamingModel.js @@ -33,9 +33,7 @@ KylinApp.service('StreamingModel', function () { "name": "", "topic": "", "timeout": "60000", - "bufferSize": "65536", "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser", - "margin": "300000", "clusters":[], "parserProperties":"" } http://git-wip-us.apache.org/repos/asf/kylin/blob/9dbc24bf/webapp/app/partials/cubeDesigner/streamingConfig.html ---------------------------------------------------------------------- diff --git a/webapp/app/partials/cubeDesigner/streamingConfig.html b/webapp/app/partials/cubeDesigner/streamingConfig.html index 8bdcd25..5dc9788 100644 --- a/webapp/app/partials/cubeDesigner/streamingConfig.html +++ b/webapp/app/partials/cubeDesigner/streamingConfig.html @@ -238,37 +238,9 @@ </div> </div> - <div class="form-group middle-popover" ng-class="{'required':state.mode=='edit'}"> - <div class="row"> - <label class="col-xs-12 col-sm-3 control-label no-padding-right"> - <b>Buffer Size</b> - <i class="fa fa-info-circle" kylinpopover placement="right" title="Buffer Size" template="BufferSizecTip.html"></i> - </label> - - <div class="col-xs-12 col-sm-6" - ng-class="{'has-error':form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)}"> - <input ng-if="state.mode=='edit'" name="bufferSize" required ng-model="kafkaMeta.bufferSize" type="text" - placeholder="Input kafkaConfig bufferSize" - ng-pattern="/^\+?[1-9][0-9]*$/" - class="form-control"/> - <small class="help-block" - ng-show="!form.cube_streaming_form.bufferSize.$error.required && form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)"> - Kafka bufferSize is invalid. - </small> - <small class="help-block" - ng-show="form.cube_streaming_form.bufferSize.$error.required && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)"> - Kafka bufferSize is required. - </small> - <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span> - </div> - </div> - </div> - </accordion-group> </accordion> - - </div> </form> </div> @@ -279,11 +251,8 @@ <script type="text/ng-template" id="TimeoutTip.html"> <p>Set timeout for kafka client.</p> </script> -<script type="text/ng-template" id="BufferSizecTip.html"> - <p>Set byte size for kafka clientâs buffer.</p> - </script> <script type="text/ng-template" id="MarginTip.html"> - <p>When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p> + <p>Deprecated. When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p> </script> <script type="text/ng-template" id="ParserName.html"> <p>Set the parser to parse source data messages. The default parser works for json messages with a timestamp field.</p>