KYLIN-2131, load kafka config from kylin-kafka-consumer.properties
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ffca41be Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ffca41be Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ffca41be Branch: refs/heads/KYLIN-2131 Commit: ffca41be520ed644b1965c8e1872c6acfc7d9a0c Parents: e14f4e1 Author: Billy Liu <billy...@apache.org> Authored: Thu Dec 15 19:18:32 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Sun Dec 18 14:15:13 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/Kafka10DataLoader.java | 34 ++++- build/conf/kylin-kafka-consumer.properties | 19 +++ .../localmeta/kylin-kafka-consumer.properties | 19 +++ .../sandbox/kylin-kafka-consumer.properties | 19 +++ .../apache/kylin/source/kafka/KafkaSource.java | 5 +- .../source/kafka/config/KafkaClusterConfig.java | 8 -- .../kylin/source/kafka/config/KafkaConfig.java | 11 -- .../kafka/config/KafkaConsumerProperties.java | 126 +++++++++++++++++++ .../source/kafka/hadoop/KafkaFlatTableJob.java | 7 +- .../source/kafka/hadoop/KafkaInputFormat.java | 7 +- .../kafka/hadoop/KafkaInputRecordReader.java | 18 +-- .../kylin/source/kafka/util/KafkaClient.java | 43 ++----- .../config/KafkaConsumerPropertiesTest.java | 31 +++++ webapp/app/js/model/streamingModel.js | 2 - .../partials/cubeDesigner/streamingConfig.html | 33 +---- 15 files changed, 280 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java index 8c548be..c7a487a 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java @@ -19,6 +19,7 @@ package org.apache.kylin.job.streaming; import java.util.List; +import java.util.Map; import java.util.Properties; import javax.annotation.Nullable; @@ -35,8 +36,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.Collections2; -import org.apache.kylin.source.kafka.util.KafkaClient; - /** * Load prepared data into kafka(for test use) */ @@ -60,10 +59,7 @@ public class Kafka10DataLoader extends StreamDataLoader { } }), ","); - Properties props = new Properties(); - props.put("acks", "1"); - props.put("retry.backoff.ms", "1000"); - KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props); + KafkaProducer producer = getKafkaProducer(brokerList, null); for (int i = 0; i < messages.size(); i++) { ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); @@ -73,4 +69,30 @@ public class Kafka10DataLoader extends StreamDataLoader { producer.close(); } + public static KafkaProducer getKafkaProducer(String brokers, Properties properties) { + Properties props = constructDefaultKafkaProducerProperties(brokers, properties); + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); + return producer; + } + + private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) { + Properties props = new Properties(); + props.put("retry.backoff.ms", "1000"); + props.put("bootstrap.servers", brokers); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("acks", "1"); + props.put("buffer.memory", 33554432); + props.put("retries", 0); + props.put("batch.size", 16384); + props.put("linger.ms", 50); + props.put("request.timeout.ms", "30000"); + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + return props; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/build/conf/kylin-kafka-consumer.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin-kafka-consumer.properties b/build/conf/kylin-kafka-consumer.properties new file mode 100644 index 0000000..d198f4e --- /dev/null +++ b/build/conf/kylin-kafka-consumer.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +session.timeout.ms=30000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/examples/test_case_data/localmeta/kylin-kafka-consumer.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin-kafka-consumer.properties b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties new file mode 100644 index 0000000..d198f4e --- /dev/null +++ b/examples/test_case_data/localmeta/kylin-kafka-consumer.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +session.timeout.ms=30000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/examples/test_case_data/sandbox/kylin-kafka-consumer.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin-kafka-consumer.properties b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties new file mode 100644 index 0000000..d198f4e --- /dev/null +++ b/examples/test_case_data/sandbox/kylin-kafka-consumer.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# for more kafka consumer configs, please refer to http://kafka.apache.org/documentation#consumerconfigs +session.timeout.ms=30000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index e469f77..1f3c446 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -20,6 +20,7 @@ package org.apache.kylin.source.kafka; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -34,6 +35,7 @@ import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +94,8 @@ public class KafkaSource implements ISource { final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { + final Properties kafkaProperties = KafkaConsumerProperties.getInstanceFromEnv().getProperties(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) { http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java index 95349c2..3b71189 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java @@ -47,18 +47,10 @@ public class KafkaClusterConfig extends RootPersistentEntity { @JsonBackReference private KafkaConfig kafkaConfig; - public int getBufferSize() { - return kafkaConfig.getBufferSize(); - } - public String getTopic() { return kafkaConfig.getTopic(); } - public int getTimeout() { - return kafkaConfig.getTimeout(); - } - public List<BrokerConfig> getBrokerConfigs() { return brokerConfigs; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 157d83c..a096344 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -55,9 +55,6 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("timeout") private int timeout; - @JsonProperty("bufferSize") - private int bufferSize; - @JsonProperty("parserName") private String parserName; @@ -97,14 +94,6 @@ public class KafkaConfig extends RootPersistentEntity { this.timeout = timeout; } - public int getBufferSize() { - return bufferSize; - } - - public void setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - } - public String getTopic() { return topic; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java new file mode 100644 index 0000000..7a15e63 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.kafka.config; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigCannotInitException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class KafkaConsumerProperties { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProperties.class); + + private volatile Properties properties = new Properties(); + + public static final String KAFKA_CONSUMER_PROPERTIES_FILE = "kylin-kafka-consumer.properties"; + + // static cached instances + private static KafkaConsumerProperties ENV_INSTANCE = null; + + public static KafkaConsumerProperties getInstanceFromEnv() { + synchronized (KafkaConsumerProperties.class) { + if (ENV_INSTANCE == null) { + try { + KafkaConsumerProperties config = new KafkaConsumerProperties(); + config.properties = loadKafkaConsumerProperties(); + + logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config)); + ENV_INSTANCE = config; + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e); + } + } + return ENV_INSTANCE; + } + } + + public Properties getProperties(){ + Properties result = new Properties(); + result.putAll(properties); + return result; + } + + public InputStream getHadoopJobConfInputStream() throws IOException { + File kafkaProperties = getKafkaConsumerPropertiesFile(); + return FileUtils.openInputStream(kafkaProperties); + } + + private static Properties loadKafkaConsumerProperties() { + File propFile = getKafkaConsumerPropertiesFile(); + if (propFile == null || !propFile.exists()) { + logger.error("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE); + throw new RuntimeException("fail to locate " + KAFKA_CONSUMER_PROPERTIES_FILE); + } + Properties conf = new Properties(); + try { + FileInputStream is = new FileInputStream(propFile); + conf.load(is); + IOUtils.closeQuietly(is); + + File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); + if (propOverrideFile.exists()) { + FileInputStream ois = new FileInputStream(propOverrideFile); + Properties propOverride = new Properties(); + propOverride.load(ois); + IOUtils.closeQuietly(ois); + conf.putAll(propOverride); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return conf; + } + + private static File getKafkaConsumerPropertiesFile() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF); + if (!StringUtils.isEmpty(kylinConfHome)) { + logger.info("Use KYLIN_CONF=" + kylinConfHome); + return getKafkaConsumerPropertiesFile(kylinConfHome); + } + + logger.warn("KYLIN_CONF property was not set, will seek KYLIN_HOME env variable"); + + String kylinHome = kylinConfig.getKylinHome(); + if (StringUtils.isEmpty(kylinHome)) + throw new KylinConfigCannotInitException("Didn't find KYLIN_CONF or KYLIN_HOME, please set one of them"); + + String path = kylinHome + File.separator + "conf"; + return getKafkaConsumerPropertiesFile(path); + } + + private static File getKafkaConsumerPropertiesFile(String path) { + if (path == null) { + return null; + } + + return new File(path, KAFKA_CONSUMER_PROPERTIES_FILE); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index f4d54c5..e030719 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -19,6 +19,7 @@ package org.apache.kylin.source.kafka.hadoop; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; @@ -57,10 +58,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers"; public static final String CONFIG_KAFKA_TOPIC = "kafka.topic"; public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout"; - public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size"; public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; + public static final String CONFIG_KAFKA_CONSUMER_PROPERTIES = "kafka.consumer.properties"; + @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -100,10 +102,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); + KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv(); + job.getConfiguration().addResource(kafkaFileConfig.getHadoopJobConfInputStream(), CONFIG_KAFKA_CONSUMER_PROPERTIES); job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); - job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index fe0e2cc..0aab72e 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -19,9 +19,11 @@ package org.apache.kylin.source.kafka.hadoop; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -65,8 +67,11 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } } + InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES); + Properties kafkaProperties = new Properties(); + kafkaProperties.load(inputStream); final List<InputSplit> splits = new ArrayList<InputSplit>(); - try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) { + try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side"); for (int i = 0; i < partitionInfos.size(); i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index 6774c9d..dfd0e59 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -19,8 +19,10 @@ package org.apache.kylin.source.kafka.hadoop; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Iterator; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -44,6 +46,8 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class); + public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000; + private Configuration conf; private KafkaInputSplit split; @@ -61,8 +65,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit private LongWritable key; private BytesWritable value; - private long timeOut = 60000; - private long bufferSize = 65536; + private long timeOut = DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT; private long numProcessedMessages = 0L; @@ -82,12 +85,13 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) { timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT)); } - if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) { - bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE)); - } - String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null); + + InputStream inputStream = conf.getConfResourceAsInputStream(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_PROPERTIES); + Properties kafkaProperties = new Properties(); + kafkaProperties.load(inputStream); + + consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties); earliestOffset = this.split.getOffsetStart(); latestOffset = this.split.getOffsetEnd(); http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 3b970b3..f891467 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -20,15 +20,16 @@ package org.apache.kylin.source.kafka.util; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.config.KafkaConsumerProperties; import java.util.Arrays; import java.util.List; @@ -39,49 +40,25 @@ import java.util.Properties; */ public class KafkaClient { + private static KafkaConsumerProperties kafkaFileConfig = KafkaConsumerProperties.getInstanceFromEnv(); + public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) { Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); return consumer; } - public static KafkaProducer getKafkaProducer(String brokers, Properties properties) { - Properties props = constructDefaultKafkaProducerProperties(brokers, properties); - KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); - return producer; - } - - private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties) { + private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { Properties props = new Properties(); - props.put("bootstrap.servers", brokers); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("acks", "1"); - props.put("buffer.memory", 33554432); - props.put("retries", 0); - props.put("batch.size", 16384); - props.put("linger.ms", 50); - props.put("request.timeout.ms", "30000"); if (properties != null) { for (Map.Entry entry : properties.entrySet()) { props.put(entry.getKey(), entry.getValue()); } } - return props; - } - - private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { - Properties props = new Properties(); props.put("bootstrap.servers", brokers); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("key.deserializer", StringDeserializer.class.getClass().getCanonicalName()); + props.put("value.deserializer", StringDeserializer.class.getClass().getCanonicalName()); props.put("group.id", consumerGroup); - props.put("session.timeout.ms", "30000"); - if (properties != null) { - for (Map.Entry entry : properties.entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - } props.put("enable.auto.commit", "false"); return props; } @@ -129,7 +106,8 @@ public class KafkaClient { final String topic = kafakaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { + Properties kafkaProperties = kafkaFileConfig.getProperties(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { long latest = getLatestOffset(consumer, topic, partitionInfo.partition()); @@ -147,7 +125,8 @@ public class KafkaClient { final String topic = kafakaConfig.getTopic(); Map<Integer, Long> startOffsets = Maps.newHashMap(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { + Properties kafkaProperties = kafkaFileConfig.getProperties(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { long latest = getEarliestOffset(consumer, topic, partitionInfo.partition()); http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java new file mode 100644 index 0000000..3690439 --- /dev/null +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java @@ -0,0 +1,31 @@ +package org.apache.kylin.source.kafka.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testLoadKafkaConfig() { + KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); + assertFalse(kafkaConsumerProperties.getProperties().containsKey("acks")); + assertTrue(kafkaConsumerProperties.getProperties().containsKey("session.timeout.ms")); + assertEquals("30000", kafkaConsumerProperties.getProperties().getProperty("session.timeout.ms")); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/webapp/app/js/model/streamingModel.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js index 0321806..70f27aa 100644 --- a/webapp/app/js/model/streamingModel.js +++ b/webapp/app/js/model/streamingModel.js @@ -33,9 +33,7 @@ KylinApp.service('StreamingModel', function () { "name": "", "topic": "", "timeout": "60000", - "bufferSize": "65536", "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser", - "margin": "300000", "clusters":[], "parserProperties":"" } http://git-wip-us.apache.org/repos/asf/kylin/blob/ffca41be/webapp/app/partials/cubeDesigner/streamingConfig.html ---------------------------------------------------------------------- diff --git a/webapp/app/partials/cubeDesigner/streamingConfig.html b/webapp/app/partials/cubeDesigner/streamingConfig.html index 8bdcd25..5dc9788 100644 --- a/webapp/app/partials/cubeDesigner/streamingConfig.html +++ b/webapp/app/partials/cubeDesigner/streamingConfig.html @@ -238,37 +238,9 @@ </div> </div> - <div class="form-group middle-popover" ng-class="{'required':state.mode=='edit'}"> - <div class="row"> - <label class="col-xs-12 col-sm-3 control-label no-padding-right"> - <b>Buffer Size</b> - <i class="fa fa-info-circle" kylinpopover placement="right" title="Buffer Size" template="BufferSizecTip.html"></i> - </label> - - <div class="col-xs-12 col-sm-6" - ng-class="{'has-error':form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)}"> - <input ng-if="state.mode=='edit'" name="bufferSize" required ng-model="kafkaMeta.bufferSize" type="text" - placeholder="Input kafkaConfig bufferSize" - ng-pattern="/^\+?[1-9][0-9]*$/" - class="form-control"/> - <small class="help-block" - ng-show="!form.cube_streaming_form.bufferSize.$error.required && form.cube_streaming_form.bufferSize.$invalid && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)"> - Kafka bufferSize is invalid. - </small> - <small class="help-block" - ng-show="form.cube_streaming_form.bufferSize.$error.required && (form.cube_streaming_form.bufferSize.$dirty||form.cube_streaming_form.$submitted)"> - Kafka bufferSize is required. - </small> - <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span> - </div> - </div> - </div> - </accordion-group> </accordion> - - </div> </form> </div> @@ -279,11 +251,8 @@ <script type="text/ng-template" id="TimeoutTip.html"> <p>Set timeout for kafka client.</p> </script> -<script type="text/ng-template" id="BufferSizecTip.html"> - <p>Set byte size for kafka clientâs buffer.</p> - </script> <script type="text/ng-template" id="MarginTip.html"> - <p>When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p> + <p>Deprecated. When the messages in kafka is not strictly sorted on timestamp, read more data (expressed in ts) before and after the specified interval to avoid data loss.</p> </script> <script type="text/ng-template" id="ParserName.html"> <p>Set the parser to parse source data messages. The default parser works for json messages with a timestamp field.</p>