This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new c3e02c3 KYLIN-3597 Improve code smell c3e02c3 is described below commit c3e02c3f0c464008dc45c559888c4149b0457376 Author: hit-lacus <hit_la...@126.com> AuthorDate: Fri Sep 28 14:43:54 2018 +0800 KYLIN-3597 Improve code smell --- .../storage/gtrecord/SegmentCubeTupleIterator.java | 19 +++++-- .../kafka/config/KafkaConsumerProperties.java | 30 +++++----- .../source/kafka/util/KafkaSampleProducer.java | 66 +++++++++++----------- .../kylin/storage/hbase/util/RowCounterCLI.java | 45 ++++++++------- 4 files changed, 83 insertions(+), 77 deletions(-) diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java index 6711664..629c025 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java @@ -98,14 +98,23 @@ public class SegmentCubeTupleIterator implements ITupleIterator { return scanRequest.getInfo(); } - public void close() throws IOException {} + public void close() { + // Underlying resource is hold by scanner and it will be closed at + // SegmentCubeTupleIterator#close, caller is SequentialCubeTupleIterator + } public Iterator<GTRecord> iterator() { return records; } }; - GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest); - return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx); + Iterator<Object[]> result; + try (GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(inputScanner, scanRequest)) { + result = aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx); + } catch (IOException ioe) { + // implementation of close method of anonymous IGTScanner is empty, no way throw exception + throw new IllegalStateException("IOException is not expected here.", ioe); + } + return result; } // simply decode records @@ -149,10 +158,10 @@ public class SegmentCubeTupleIterator implements ITupleIterator { if (!gtValues.hasNext()) { return false; } - Object[] gtValues = this.gtValues.next(); + Object[] values = this.gtValues.next(); // translate into tuple - advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple); + advMeasureFillers = cubeTupleConverter.translateResult(values, tuple); // the simple case if (advMeasureFillers == null) { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index cc32ed9..a1b9ab2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -20,6 +20,7 @@ package org.apache.kylin.source.kafka.config; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; @@ -28,7 +29,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -56,8 +56,8 @@ public class KafkaConsumerProperties { try { KafkaConsumerProperties config = new KafkaConsumerProperties(); config.properties = config.loadKafkaConsumerProperties(); - - logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config)); + logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : {}", + System.identityHashCode(config)); ENV_INSTANCE = config; } catch (IllegalArgumentException e) { throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e); @@ -79,7 +79,7 @@ public class KafkaConsumerProperties { Set<String> configNames = new HashSet<String>(); try { configNames = ConsumerConfig.configNames(); - } catch (Error e) { + } catch (Exception e) { // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException which is an Error, not Exception String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " au [...] + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.windo [...] @@ -101,27 +101,27 @@ public class KafkaConsumerProperties { private Properties loadKafkaConsumerProperties() { File propFile = getKafkaConsumerFile(); if (propFile == null || !propFile.exists()) { - logger.warn("fail to locate " + KAFKA_CONSUMER_FILE + ", use empty kafka consumer properties"); + logger.warn("fail to locate {}, use empty kafka consumer properties", KAFKA_CONSUMER_FILE); return new Properties(); } Properties properties = new Properties(); - try { - FileInputStream is = new FileInputStream(propFile); + try (FileInputStream is = new FileInputStream(propFile)) { Configuration conf = new Configuration(); conf.addResource(is); properties.putAll(extractKafkaConfigToProperties(conf)); - IOUtils.closeQuietly(is); File propOverrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); if (propOverrideFile.exists()) { - FileInputStream ois = new FileInputStream(propOverrideFile); - Configuration oconf = new Configuration(); - oconf.addResource(ois); - properties.putAll(extractKafkaConfigToProperties(oconf)); - IOUtils.closeQuietly(ois); + try (FileInputStream ois = new FileInputStream(propOverrideFile)) { + Configuration oconf = new Configuration(); + oconf.addResource(ois); + properties.putAll(extractKafkaConfigToProperties(oconf)); + } } + } catch (FileNotFoundException fne) { + throw new IllegalArgumentException(fne); } catch (IOException e) { - throw new RuntimeException(e); + // close inputStream quietly } return properties; @@ -135,7 +135,7 @@ public class KafkaConsumerProperties { private File getKafkaConsumerFile() { String kylinConfHome = System.getProperty(KylinConfig.KYLIN_CONF); if (!StringUtils.isEmpty(kylinConfHome)) { - logger.info("Use KYLIN_CONF=" + kylinConfHome); + logger.info("Use KYLIN_CONF={}", kylinConfHome); return getKafkaConsumerFile(kylinConfHome); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java index 973f020..56e2dd5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java @@ -55,7 +55,7 @@ public class KafkaSampleProducer { private static final ObjectMapper mapper = new ObjectMapper(); public static void main(String[] args) throws Exception { - logger.info("args: " + Arrays.toString(args)); + logger.info("args: {}", Arrays.toString(args)); OptionsHelper optionsHelper = new OptionsHelper(); Options options = new Options(); String topic, broker; @@ -64,7 +64,7 @@ public class KafkaSampleProducer { options.addOption(OPTION_INTERVAL); optionsHelper.parseOptions(options, args); - logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); + logger.info("options: '{}'", optionsHelper.getOptionsAsString()); topic = optionsHelper.getOptionValue(OPTION_TOPIC); broker = optionsHelper.getOptionValue(OPTION_BROKER); @@ -75,7 +75,7 @@ public class KafkaSampleProducer { interval = Long.parseLong(intervalString); } - List<String> countries = new ArrayList(); + List<String> countries = new ArrayList<>(); countries.add("AUSTRALIA"); countries.add("CANADA"); countries.add("CHINA"); @@ -84,19 +84,19 @@ public class KafkaSampleProducer { countries.add("KOREA"); countries.add("US"); countries.add("Other"); - List<String> category = new ArrayList(); + List<String> category = new ArrayList<>(); category.add("BOOK"); category.add("TOY"); category.add("CLOTH"); category.add("ELECTRONIC"); category.add("Other"); - List<String> devices = new ArrayList(); + List<String> devices = new ArrayList<>(); devices.add("iOS"); devices.add("Windows"); devices.add("Andriod"); devices.add("Other"); - List<String> genders = new ArrayList(); + List<String> genders = new ArrayList<>(); genders.add("Male"); genders.add("Female"); @@ -110,34 +110,32 @@ public class KafkaSampleProducer { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - Producer<String, String> producer = new KafkaProducer<>(props); - - boolean alive = true; - Random rnd = new Random(); - Map<String, Object> record = new HashMap(); - while (alive == true) { - //add normal record - record.put("order_time", (new Date().getTime())); - record.put("country", countries.get(rnd.nextInt(countries.size()))); - record.put("category", category.get(rnd.nextInt(category.size()))); - record.put("device", devices.get(rnd.nextInt(devices.size()))); - record.put("qty", rnd.nextInt(10)); - record.put("currency", "USD"); - record.put("amount", rnd.nextDouble() * 100); - //add embedded record - Map<String, Object> user = new HashMap(); - user.put("id", RandomUtil.randomUUID().toString()); - user.put("gender", genders.get(rnd.nextInt(2))); - user.put("age", rnd.nextInt(20) + 10); - user.put("first_name", "unknown"); - record.put("user", user); - //send message - ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); - System.out.println("Sending 1 message: " + JsonUtil.writeValueAsString(record)); - producer.send(data); - Thread.sleep(interval); + try (Producer<String, String> producer = new KafkaProducer<>(props)) { + boolean alive = true; + Random rnd = new Random(); + Map<String, Object> record = new HashMap<>(); + while (alive == true) { + //add normal record + record.put("order_time", (new Date().getTime())); + record.put("country", countries.get(rnd.nextInt(countries.size()))); + record.put("category", category.get(rnd.nextInt(category.size()))); + record.put("device", devices.get(rnd.nextInt(devices.size()))); + record.put("qty", rnd.nextInt(10)); + record.put("currency", "USD"); + record.put("amount", rnd.nextDouble() * 100); + //add embedded record + Map<String, Object> user = new HashMap<>(); + user.put("id", RandomUtil.randomUUID().toString()); + user.put("gender", genders.get(rnd.nextInt(2))); + user.put("age", rnd.nextInt(20) + 10); + user.put("first_name", "unknown"); + record.put("user", user); + //send message + ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); + System.out.println("Sending 1 message: " + JsonUtil.writeValueAsString(record)); + producer.send(data); + Thread.sleep(interval); + } } - producer.close(); } - } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java index db516bb..d6367e5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java @@ -42,24 +42,26 @@ public class RowCounterCLI { public static void main(String[] args) throws IOException { if (args == null || args.length != 3) { - System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]"); + logger.info( + "Usage: hbase org.apache.hadoop.util.RunJar kylin-job-latest.jar org.apache.kylin.job.tools.RowCounterCLI [HTABLE_NAME] [STARTKEY] [ENDKEY]"); + return; // if no enough arguments provided, return with above message } - System.out.println(args[0]); + logger.info(args[0]); String htableName = args[0]; - System.out.println(args[1]); + logger.info(args[1]); byte[] startKey = BytesUtil.fromReadableText(args[1]); - System.out.println(args[2]); + logger.info(args[2]); byte[] endKey = BytesUtil.fromReadableText(args[2]); if (startKey == null) { - System.out.println("startkey is null "); + logger.info("startkey is null "); } else { - System.out.println("startkey lenght: " + startKey.length); + logger.info("startkey lenght: {}", startKey.length); } - System.out.println("start key in binary: " + Bytes.toStringBinary(startKey)); - System.out.println("end key in binary: " + Bytes.toStringBinary(endKey)); + logger.info("start key in binary: {}", Bytes.toStringBinary(startKey)); + logger.info("end key in binary: {}", Bytes.toStringBinary(endKey)); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); @@ -69,22 +71,19 @@ public class RowCounterCLI { scan.setStartRow(startKey); scan.setStopRow(endKey); - logger.info("My Scan " + scan.toString()); - - Connection conn = ConnectionFactory.createConnection(conf); - Table tableInterface = conn.getTable(TableName.valueOf(htableName)); - - Iterator<Result> iterator = tableInterface.getScanner(scan).iterator(); - int counter = 0; - while (iterator.hasNext()) { - iterator.next(); - counter++; - if (counter % 1000 == 1) { - System.out.println("number of rows: " + counter); + logger.info("My Scan {}", scan); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table tableInterface = conn.getTable(TableName.valueOf(htableName))) { + Iterator<Result> iterator = tableInterface.getScanner(scan).iterator(); + int counter = 0; + while (iterator.hasNext()) { + iterator.next(); + counter++; + if (counter % 1000 == 1) { + logger.info("number of rows: {}", counter); + } } + logger.info("number of rows: {}", counter); } - System.out.println("number of rows: " + counter); - tableInterface.close(); - conn.close(); } }