http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java index 0d8daa4..e8a93bd 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java @@ -88,23 +88,19 @@ public class CLIHiveClient implements IHiveClient { List<HiveTableMeta.HiveTableColumnMeta> allColumns = Lists.newArrayList(); List<HiveTableMeta.HiveTableColumnMeta> partitionColumns = Lists.newArrayList(); for (FieldSchema fieldSchema : allFields) { - allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), - fieldSchema.getComment())); + allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment())); } if (partitionFields != null && partitionFields.size() > 0) { for (FieldSchema fieldSchema : partitionFields) { - partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), - fieldSchema.getComment())); + partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment())); } } builder.setAllColumns(allColumns); builder.setPartitionColumns(partitionColumns); builder.setSdLocation(table.getSd().getLocation()); - builder.setFileSize( - getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE)); - builder.setFileNum( - getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES)); + builder.setFileSize(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE)); + builder.setFileNum(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES)); builder.setIsNative(!MetaStoreUtils.isNonNativeTable(table)); builder.setTableName(tableName); builder.setSdInputFormat(table.getSd().getInputFormat());
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index c907b44..15d4456 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -137,8 +137,7 @@ public class HiveMRInput implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) - .getConfig(); + final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); JobEngineConfig conf = new JobEngineConfig(cubeConfig); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); @@ -169,8 +168,7 @@ public class HiveMRInput implements IMRInput { return step; } - private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, - String jobWorkingDir) { + private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -197,25 +195,21 @@ public class HiveMRInput implements IMRInput { if (lookUpTableDesc.isView()) { StringBuilder createIntermediateTableHql = new StringBuilder(); createIntermediateTableHql.append("DROP TABLE IF EXISTS " + intermediate + ";\n"); - createIntermediateTableHql - .append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n"); + createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n"); createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n"); - createIntermediateTableHql - .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n"); + createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n"); hiveCmdBuilder.addStatement(createIntermediateTableHql.toString()); hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";"; } } - hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, - hiveViewIntermediateTables.length() - 1); + hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length() - 1); step.setCmd(hiveCmdBuilder.build()); return step; } - private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, - String cubeName) { + private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) { final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); @@ -302,12 +296,10 @@ public class HiveMRInput implements IMRInput { logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount); if (rowCount == 0) { if (!config.isEmptySegmentAllowed()) { - stepLogger.log("Detect upstream hive table is empty, " - + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\""); + stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\""); return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); } else { - return new ExecuteResult(ExecuteResult.State.SUCCEED, - "Row count is 0, no need to redistribute"); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute"); } } @@ -384,8 +376,7 @@ public class HiveMRInput implements IMRInput { config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); output.append("Hive table " + hiveTable + " is dropped. \n"); rmdirOnHDFS(getExternalDataPath()); - output.append( - "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); + output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); } return output.toString(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java index 5fff000..14ed1f9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java @@ -20,6 +20,7 @@ package org.apache.kylin.source.hive; import java.io.IOException; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.mr.DFSFileTable; @@ -77,7 +78,7 @@ public class HiveTable implements IReadableTable { throw new IOException(e); } } - + @Override public boolean exists() { return true; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java index 089850a..fa9eb29 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java @@ -34,8 +34,7 @@ class HiveTableMeta { @Override public String toString() { - return "HiveTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='" - + comment + '\'' + '}'; + return "HiveTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='" + comment + '\'' + '}'; } } @@ -53,9 +52,7 @@ class HiveTableMeta { List<HiveTableColumnMeta> allColumns; List<HiveTableColumnMeta> partitionColumns; - public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner, - String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, - boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) { + public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner, String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) { this.tableName = tableName; this.sdLocation = sdLocation; this.sdInputFormat = sdInputFormat; @@ -73,10 +70,6 @@ class HiveTableMeta { @Override public String toString() { - return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' - + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' - + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime - + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" - + allColumns + ", partitionColumns=" + partitionColumns + '}'; + return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns + ", partitionColumns=" + partitionColumns + '}'; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java index 6fedd8b..073ded5 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java @@ -106,7 +106,6 @@ public class HiveTableMetaBuilder { } public HiveTableMeta createHiveTableMeta() { - return new HiveTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, tableType, lastAccessTime, - fileSize, fileNum, skipHeaderLineCount, isNative, allColumns, partitionColumns); + return new HiveTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, tableType, lastAccessTime, fileSize, fileNum, skipHeaderLineCount, isNative, allColumns, partitionColumns); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java index 48e0ee3..75f322f 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java @@ -143,8 +143,7 @@ public class HiveTableReader implements TableReader { return "hive table reader for: " + dbName + "." + tableName; } - private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) - throws Exception { + private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception { HiveConf hiveConf = new HiveConf(HiveTableReader.class); Iterator<Entry<String, String>> itr = hiveConf.iterator(); Map<String, String> map = new HashMap<String, String>(); @@ -157,8 +156,7 @@ public class HiveTableReader implements TableReader { if (partitionKV == null || partitionKV.size() == 0) { entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build(); } else { - entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV) - .build(); + entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build(); } HCatReader reader = DataTransferFactory.getHCatReader(entity, map); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java index 9db65042..22bea46 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java @@ -18,11 +18,11 @@ package org.apache.kylin.source.hive; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; + import java.io.IOException; import java.util.List; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; - public interface IHiveClient { void executeHQL(String hql) throws CommandNeedRetryException, IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 52730bd..ffd54db 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -71,7 +71,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab ColumnDesc[] columns = tableDesc.getColumns(); Collection<String[]> valuesCollection = tableInputFormat.parseMapperInput(value); - for (String[] values : valuesCollection) { + for (String[] values: valuesCollection) { for (int m = 0; m < columns.length; m++) { String field = columns[m].getName(); String fieldValue = values[m]; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java index 3724ef7..0648960 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java @@ -49,8 +49,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri } @Override - public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) - throws IOException, InterruptedException { + public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { int skey = key.get(); for (BytesWritable v : values) { ByteBuffer buffer = ByteBuffer.wrap(v.getBytes()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index 7179a66..f439ccb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -50,8 +50,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job"; @SuppressWarnings("static-access") - protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true) - .withDescription("The hive table name").create("table"); + protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); public HiveColumnCardinalityJob() { } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java index 5f48523..246822c 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java @@ -52,8 +52,7 @@ public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob { public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job"; @SuppressWarnings("static-access") - protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true) - .withDescription("The hive table name").create("table"); + protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); private String table; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java index 21dac70..4bcd572 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java @@ -18,10 +18,10 @@ package org.apache.kylin.source.kafka; -import java.util.Map; - import org.apache.commons.lang3.StringUtils; +import java.util.Map; + /** */ public class DefaultTimeParser extends AbstractTimeParser { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index 650f57e..50295c3 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -73,8 +73,7 @@ public class KafkaConfigManager { } @Override - public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) - throws IOException { + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { if (event == Event.DROP) removeKafkaConfigLocal(cacheKey); else @@ -218,13 +217,11 @@ public class KafkaConfigManager { private void reloadAllKafkaConfig() throws IOException { ResourceStore store = getStore(); - logger.info("Reloading Kafka Metadata from folder " - + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT)); + logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT)); kafkaMap.clear(); - List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, - MetadataConstants.FILE_SURFIX); + List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); for (String path : paths) { KafkaConfig kafkaConfig; try { @@ -234,8 +231,7 @@ public class KafkaConfigManager { continue; } if (path.equals(kafkaConfig.getResourcePath()) == false) { - logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " - + kafkaConfig.getResourcePath()); + logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath()); continue; } if (kafkaMap.containsKey(kafkaConfig.getName())) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 5815d53..3323afb 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -78,14 +78,13 @@ public class KafkaMRInput implements IMRInput { public IMRTableInputFormat getTableInputFormat(TableDesc table) { KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity()); - List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), - new Function<ColumnDesc, TblColRef>() { - @Nullable - @Override - public TblColRef apply(ColumnDesc input) { - return input.getRef(); - } - }); + List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() { + @Nullable + @Override + public TblColRef apply(ColumnDesc input) { + return input.getRef(); + } + }); return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null); } @@ -100,13 +99,11 @@ public class KafkaMRInput implements IMRInput { private StreamingParser streamingParser; private final JobEngineConfig conf; - public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, - JobEngineConfig conf) { + public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { this.cubeSegment = cubeSegment; this.conf = conf; try { - streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), - kafkaConfig.getParserProperties(), columns); + streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); } catch (ReflectiveOperationException e) { throw new IllegalArgumentException(e); } @@ -117,8 +114,7 @@ public class KafkaMRInput implements IMRInput { job.setInputFormatClass(SequenceFileInputFormat.class); String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID); IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); - String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, - JobBuilderSupport.getJobWorkingDir(conf, jobId)); + String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); try { FileInputFormat.addInputPath(job, new Path(inputPath)); } catch (IOException e) { @@ -130,10 +126,10 @@ public class KafkaMRInput implements IMRInput { public Collection<String[]> parseMapperInput(Object mapperInput) { Text text = (Text) mapperInput; ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()); - List<StreamingMessageRow> streamingMessageRowList = streamingParser.parse(buffer); + List<StreamingMessageRow> streamingMessageRowList = streamingParser.parse(buffer); List<String[]> parsedDataCollection = new ArrayList<>(); - for (StreamingMessageRow row : streamingMessageRowList) { + for (StreamingMessageRow row: streamingMessageRowList) { parsedDataCollection.add(row.getData().toArray(new String[row.getData().size()])); } @@ -162,19 +158,16 @@ public class KafkaMRInput implements IMRInput { MapReduceExecutable result = new MapReduceExecutable(); IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg); - outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, - JobBuilderSupport.getJobWorkingDir(conf, jobId)); + outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); result.setName("Save data from Kafka"); result.setMapReduceJobClass(KafkaFlatTableJob.class); JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system"); StringBuilder cmd = new StringBuilder(); jobBuilderSupport.appendMapReduceParameters(cmd); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, - seg.getRealization().getName()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); result.setMapReduceParams(cmd.toString()); return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index fc0d50d..52d2e6f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -32,8 +32,8 @@ import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.streaming.StreamingConfig; -import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.ISource; +import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -71,25 +71,20 @@ public class KafkaSource implements ISource { if (result.getStartOffset() == 0) { final CubeSegment last = cube.getLastSegment(); if (last != null) { - logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " - + last.getSourcePartitionOffsetEnd()); + logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd()); // from last seg's end position result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd()); - } else if (cube.getDescriptor().getPartitionOffsetStart() != null - && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { - logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " - + cube.getDescriptor().getPartitionOffsetStart()); + } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { + logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart()); result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); } else { // from the topic's earliest offset; - logger.debug( - "Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset."); + logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset."); result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); } } - final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()) - .getKafkaConfig(cube.getRootFactTable()); + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); final String topic = kafkaConfig.getTopic(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { @@ -113,9 +108,7 @@ public class KafkaSource implements ISource { for (Integer partitionId : latestOffsets.keySet()) { if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) { if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) { - throw new IllegalArgumentException("Partition " + partitionId + " end offset (" - + latestOffsets.get(partitionId) + ") is smaller than start offset ( " - + result.getSourcePartitionOffsetStart().get(partitionId) + ")"); + throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")"); } } else { throw new IllegalStateException("New partition added in between, retry."); @@ -133,8 +126,7 @@ public class KafkaSource implements ISource { } if (totalStartOffset > totalEndOffset) { - throw new IllegalArgumentException( - "Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset); + throw new IllegalArgumentException("Illegal offset: start: " + totalStartOffset + ", end: " + totalEndOffset); } if (totalStartOffset == totalEndOffset) { @@ -159,8 +151,7 @@ public class KafkaSource implements ISource { if (startOffset > 0) { if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) { - throw new IllegalArgumentException( - "When 'startOffset' is > 0, need provide each partition's start offset"); + throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset"); } long totalOffset = 0; @@ -169,15 +160,13 @@ public class KafkaSource implements ISource { } if (totalOffset != startOffset) { - throw new IllegalArgumentException( - "Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'"); + throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'"); } } if (endOffset > 0 && endOffset != Long.MAX_VALUE) { if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) { - throw new IllegalArgumentException( - "When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset"); + throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset"); } long totalOffset = 0; @@ -186,8 +175,7 @@ public class KafkaSource implements ISource { } if (totalOffset != endOffset) { - throw new IllegalArgumentException( - "Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'"); + throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'"); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 1459c2d..2e3c11c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -19,20 +19,20 @@ package org.apache.kylin.source.kafka; import java.lang.reflect.Constructor; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; +import java.nio.ByteBuffer; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.StreamingMessageRow; import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.metadata.model.TblColRef; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params */ @@ -68,8 +68,7 @@ public abstract class StreamingParser { abstract public boolean filter(StreamingMessageRow streamingMessageRow); - public static StreamingParser getStreamingParser(String parserName, String parserProperties, - List<TblColRef> columns) throws ReflectiveOperationException { + public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException { if (!StringUtils.isEmpty(parserName)) { logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties); Class clazz = Class.forName(parserName); @@ -77,8 +76,7 @@ public abstract class StreamingParser { Constructor constructor = clazz.getConstructor(List.class, Map.class); return (StreamingParser) constructor.newInstance(columns, properties); } else { - throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " - + parserProperties + "."); + throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + "."); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index beed6f7..de167b4 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -21,14 +21,15 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; +import java.util.ArrayList; import java.util.Map; +import java.util.HashMap; import java.util.TreeMap; +import java.util.Collections; +import java.util.Arrays; +import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.ByteBufferBackedInputStream; import org.apache.kylin.common.util.StreamingMessageRow; @@ -36,7 +37,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.MapType; @@ -68,8 +68,7 @@ public final class TimedJsonStreamParser extends StreamingParser { private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); private final Map<String, String[]> nameMap = new HashMap<>(); - private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), - SimpleType.construct(Object.class)); + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class)); private AbstractTimeParser streamTimeParser; @@ -89,12 +88,10 @@ public final class TimedJsonStreamParser extends StreamingParser { Constructor constructor = clazz.getConstructor(Map.class); streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties); } catch (Exception e) { - throw new IllegalStateException( - "Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e); + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e); } } else { - throw new IllegalStateException( - "Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + "."); + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + "."); } mapper = new ObjectMapper(); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); @@ -119,8 +116,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } } - StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t, - Collections.<String, Object> emptyMap()); + StreamingMessageRow streamingMessageRow = new StreamingMessageRow(result, 0, t, Collections.<String, Object>emptyMap()); List<StreamingMessageRow> messageRowList = new ArrayList<StreamingMessageRow>(); messageRowList.add(streamingMessageRow); return messageRowList; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java index 1e75763..fc3bba0 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/BrokerConfig.java @@ -18,15 +18,15 @@ package org.apache.kylin.source.kafka.config; -import java.io.Serializable; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; + /** */ @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class BrokerConfig implements Serializable { +public class BrokerConfig implements Serializable{ @JsonProperty("id") private int id; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java index 44be966..afe888f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java @@ -39,8 +39,7 @@ import kafka.cluster.Broker; */ @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class KafkaClusterConfig extends RootPersistentEntity { - public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>( - KafkaClusterConfig.class); + public static Serializer<KafkaClusterConfig> SERIALIZER = new JsonSerializer<KafkaClusterConfig>(KafkaClusterConfig.class); @JsonProperty("brokers") private List<BrokerConfig> brokerConfigs; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index b073921..cc32ed9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -57,8 +57,7 @@ public class KafkaConsumerProperties { KafkaConsumerProperties config = new KafkaConsumerProperties(); config.properties = config.loadKafkaConsumerProperties(); - logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " - + System.identityHashCode(config)); + logger.info("Initialized a new KafkaConsumerProperties from getInstanceFromEnv : " + System.identityHashCode(config)); ENV_INSTANCE = config; } catch (IllegalArgumentException e) { throw new IllegalStateException("Failed to find KafkaConsumerProperties ", e); @@ -82,14 +81,8 @@ public class KafkaConsumerProperties { configNames = ConsumerConfig.configNames(); } catch (Error e) { // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException which is an Error, not Exception - String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," - + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," - + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," - + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," - + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," - + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms," - + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset") - .split(","); + String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms," + + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(","); configNames.addAll(Arrays.asList(configNamesArray)); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index b3a0f19..11466e5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -97,8 +97,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { String topic = kafkaConfig.getTopic(); if (brokers == null || brokers.length() == 0 || topic == null) { - throw new IllegalArgumentException( - "Invalid Kafka information, brokers " + brokers + ", topic " + topic); + throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); } JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); @@ -144,7 +143,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString()); job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString()); - for (Integer partition : offsetStart.keySet()) { + for(Integer partition: offsetStart.keySet()) { job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString()); job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index 71f823f..c996c5f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -69,11 +69,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf); final List<InputSplit> splits = new ArrayList<InputSplit>(); - try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, - kafkaProperties)) { + try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); - Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), - "partition number mismatch with server side"); + Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side"); for (int i = 0; i < partitionInfos.size(); i++) { final PartitionInfo partition = partitionInfos.get(i); int partitionId = partition.partition(); @@ -82,8 +80,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { - InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, - startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)); + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)); splits.add(split); } } @@ -92,8 +89,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } @Override - public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) - throws IOException, InterruptedException { + public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { return new KafkaInputRecordReader(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index c1bb625..c22c72f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -89,15 +89,13 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf); - consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, - kafkaProperties); + consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties); earliestOffset = this.split.getOffsetStart(); latestOffset = this.split.getOffsetEnd(); TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.assign(Arrays.asList(topicPartition)); - log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", - new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset }); + log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset }); } @Override @@ -122,9 +120,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit iterator = messages.iterator(); if (!iterator.hasNext()) { log.info("No more messages, stop"); - throw new IOException( - String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", - latestOffset, watermark)); + throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark)); } } @@ -143,8 +139,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit } log.error("Unexpected iterator end."); - throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", - latestOffset, watermark)); + throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark)); } @Override @@ -167,8 +162,7 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit @Override public void close() throws IOException { - log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, - numProcessedMessages); + log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages); consumer.close(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java index c8a0110..3261399 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java @@ -72,7 +72,7 @@ public class KafkaInputSplit extends InputSplit implements Writable { @Override public String[] getLocations() throws IOException, InterruptedException { - return new String[] { brokers }; + return new String[]{brokers}; } public int getPartition() { @@ -99,4 +99,4 @@ public class KafkaInputSplit extends InputSplit implements Writable { public String toString() { return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java index d357d91..914fca2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -33,8 +34,6 @@ import org.apache.kylin.job.execution.ExecuteResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** */ public class MergeOffsetStep extends AbstractExecutable { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 56d3687..bd8f90e 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -17,11 +17,7 @@ */ package org.apache.kylin.source.kafka.util; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; - +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -34,7 +30,10 @@ import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; -import com.google.common.collect.Maps; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; /** */ @@ -46,8 +45,7 @@ public class KafkaClient { return consumer; } - private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, - Properties properties) { + private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { Properties props = new Properties(); if (properties != null) { for (Map.Entry entry : properties.entrySet()) { @@ -99,8 +97,7 @@ public class KafkaClient { } public static Map<Integer, Long> getLatestOffsets(final CubeInstance cubeInstance) { - final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()) - .getKafkaConfig(cubeInstance.getRootFactTable()); + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); final String topic = kafkaConfig.getTopic(); @@ -116,9 +113,9 @@ public class KafkaClient { return startOffsets; } + public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) { - final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()) - .getKafkaConfig(cubeInstance.getRootFactTable()); + final KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); final String topic = kafkaConfig.getTopic(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java index fc04f62..4b91e03 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java @@ -48,12 +48,9 @@ public class KafkaSampleProducer { private static final Logger logger = LoggerFactory.getLogger(KafkaSampleProducer.class); @SuppressWarnings("static-access") - private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true) - .withDescription("Kafka topic").create("topic"); - private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true) - .withDescription("Kafka broker").create("broker"); - private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false) - .withDescription("Simulated message interval in mili-seconds, default 1000").create("interval"); + private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic"); + private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker"); + private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval"); private static final ObjectMapper mapper = new ObjectMapper(); @@ -134,8 +131,7 @@ public class KafkaSampleProducer { user.put("age", rnd.nextInt(20) + 10); record.put("user", user); //send message - ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", - mapper.writeValueAsString(record)); + ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); System.out.println("Sending 1 message: " + JsonUtil.writeValueAsString(record)); producer.send(data); Thread.sleep(interval); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java index 2339862..8dc840b 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java @@ -45,8 +45,7 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { private static String[] userNeedColNames; private static final String jsonFilePath = "src/test/resources/message.json"; private static ObjectMapper mapper; - private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), - SimpleType.construct(Object.class)); + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class)); @BeforeClass public static void setUp() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java index cc94a35..208fdb6 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java @@ -52,17 +52,14 @@ public class KafkaConsumerPropertiesTest extends LocalFileMetadataTestCase { KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); assertFalse(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("acks")); assertTrue(kafkaConsumerProperties.extractKafkaConfigToProperties().containsKey("session.timeout.ms")); - assertEquals("30000", - kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms")); + assertEquals("30000", kafkaConsumerProperties.extractKafkaConfigToProperties().getProperty("session.timeout.ms")); } @Test - public void testLoadKafkaPropertiesAsHadoopJobConf() - throws IOException, ParserConfigurationException, SAXException { + public void testLoadKafkaPropertiesAsHadoopJobConf() throws IOException, ParserConfigurationException, SAXException { KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv(); Configuration conf = new Configuration(false); - conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), - KafkaConsumerProperties.KAFKA_CONSUMER_FILE); + conf.addResource(new FileInputStream(new File(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf())), KafkaConsumerProperties.KAFKA_CONSUMER_FILE); assertEquals("30000", conf.get("session.timeout.ms")); Properties prop = KafkaConsumerProperties.extractKafkaConfigToProperties(conf); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 5b4126f..6580107 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -104,8 +104,7 @@ public class HBaseConnection { int coreThreads = config.getHBaseCoreConnectionThreads(); long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds(); LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100); - ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, - workQueue, // + ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, // Threads.newDaemonThreadFactory("kylin-coproc-")); tpe.allowCoreThreadTimeOut(true); @@ -145,8 +144,7 @@ public class HBaseConnection { private static Configuration newHBaseConfiguration(StorageURL url) { // using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath if (!"hbase".equals(url.getScheme())) - throw new IllegalArgumentException( - "to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties"); + throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.storage.url=hbase' in kylin.properties"); Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); addHBaseClusterNNHAConfiguration(conf); @@ -165,7 +163,7 @@ public class HBaseConnection { if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) { conf.set("hbase.fs.tmp.dir", "/tmp"); } - + for (Entry<String, String> entry : url.getAllParameters().entrySet()) { conf.set(entry.getKey(), entry.getValue()); } @@ -266,8 +264,7 @@ public class HBaseConnection { return tableExists(HBaseConnection.get(hbaseUrl), tableName); } - public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families) - throws IOException { + public static void createHTableIfNeeded(StorageURL hbaseUrl, String tableName, String... families) throws IOException { createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); } @@ -280,7 +277,7 @@ public class HBaseConnection { TableName tableName = TableName.valueOf(table); DistributedLock lock = null; String lockPath = getLockPath(table); - + try { if (tableExists(conn, table)) { logger.debug("HTable '" + table + "' already exists"); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index f5f40a1..a2e0229 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -149,11 +149,9 @@ public class HBaseResourceStore extends ResourceStore { @Override public String getMetaStoreUUID() throws IOException { if (!exists(ResourceStore.METASTORE_UUID_TAG)) { - putResource(ResourceStore.METASTORE_UUID_TAG, new StringEntity(createMetaStoreUUID()), 0, - StringEntity.serializer); + putResource(ResourceStore.METASTORE_UUID_TAG, new StringEntity(createMetaStoreUUID()), 0, StringEntity.serializer); } - StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.class, - StringEntity.serializer); + StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.class, StringEntity.serializer); return entity.toString(); } @@ -204,8 +202,7 @@ public class HBaseResourceStore extends ResourceStore { } @Override - protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) - throws IOException { + protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException { FilterList filter = generateTimeFilterList(timeStart, timeEndExclusive); final List<RawResource> result = Lists.newArrayList(); try { @@ -229,13 +226,11 @@ public class HBaseResourceStore extends ResourceStore { private FilterList generateTimeFilterList(long timeStart, long timeEndExclusive) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); if (timeStart != Long.MIN_VALUE) { - SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, - CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart)); + SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(timeStart)); filterList.addFilter(timeStartFilter); } if (timeEndExclusive != Long.MAX_VALUE) { - SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, - CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive)); + SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS, Bytes.toBytes(timeEndExclusive)); filterList.addFilter(timeEndFilter); } return filterList.getFilters().size() == 0 ? null : filterList; @@ -296,8 +291,7 @@ public class HBaseResourceStore extends ResourceStore { } @Override - protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) - throws IOException, IllegalStateException { + protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); @@ -305,12 +299,10 @@ public class HBaseResourceStore extends ResourceStore { Put put = buildPut(resPath, newTS, row, content, table); boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS - + ", operation result: " + ok); + logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok); if (!ok) { long real = getResourceTimestampImpl(resPath); - throw new IllegalStateException( - "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); } return newTS; @@ -363,8 +355,7 @@ public class HBaseResourceStore extends ResourceStore { } - private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) - throws IOException { + private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { byte[] rowkey = Bytes.toBytes(path); Get get = new Get(rowkey); @@ -409,8 +400,7 @@ public class HBaseResourceStore extends ResourceStore { } private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { - int kvSizeLimit = Integer - .parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760")); + int kvSizeLimit = Integer.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760")); if (content.length > kvSizeLimit) { writeLargeCellToHdfs(resPath, content, table); content = BytesUtil.EMPTY_BYTE_ARRAY; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java index 0d44adc..fc6f878 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java @@ -50,16 +50,14 @@ public class HBaseStorage implements IStorage { CubeInstance cubeInstance = (CubeInstance) realization; String cubeStorageQuery; if (cubeInstance.getStorageType() == IStorageAware.ID_HBASE) {//v2 query engine cannot go with v1 storage now - throw new IllegalStateException( - "Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more"); + throw new IllegalStateException("Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more"); } else { cubeStorageQuery = v2CubeStorageQuery;//by default use v2 } IStorageQuery ret; try { - ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class) - .newInstance((CubeInstance) realization); + ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization); } catch (Exception e) { throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e); } @@ -72,13 +70,11 @@ public class HBaseStorage implements IStorage { private static TblColRef getPartitionCol(IRealization realization) { String modelName = realization.getModel().getName(); - DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()) - .getDataModelDesc(modelName); + DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName); PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc(); Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!"); TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef(); - Preconditions.checkArgument(partitionColRef != null, - "getPartitionDateColumnRef for " + realization + " is null"); + Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null"); return partitionColRef; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java index 4d69925..25abdfb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggrKey.java @@ -124,8 +124,7 @@ public class AggrKey implements Comparable<AggrKey> { return comp; for (int i = 0; i < groupByMaskSet.length; i++) { - comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]], - o.data[o.offset + groupByMaskSet[i]]); + comp = BytesUtil.compareByteUnsigned(this.data[this.offset + groupByMaskSet[i]], o.data[o.offset + groupByMaskSet[i]]); if (comp != 0) return comp; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java index 386564a..2a85894 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/AggregationCache.java @@ -77,16 +77,13 @@ public abstract class AggregationCache { int size = aggBufMap.size(); long memUsage = (40L + rowMemBytes) * size; if (memUsage > MEMORY_USAGE_CAP) { - throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " - + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor."); + throw new RuntimeException("Kylin coprocessor memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abort coprocessor."); } //If less than 5% of max memory long avail = MemoryBudgetController.getSystemAvailBytes(); if (avail < (MEMOERY_MAX_BYTES / 20)) { - throw new RuntimeException( - "Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is " - + avail + ". Max memory is " + MEMOERY_MAX_BYTES); + throw new RuntimeException("Running Kylin coprocessor when too little memory is left. Abort coprocessor. Current available memory is " + avail + ". Max memory is " + MEMOERY_MAX_BYTES); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java index 2b3b91b..63e3bdb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorFilter.java @@ -33,8 +33,7 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple; */ public class CoprocessorFilter { - public static CoprocessorFilter fromFilter(final IDimensionEncodingMap dimEncMap, TupleFilter rootFilter, - FilterDecorator.FilterConstantsTreatment filterConstantsTreatment) { + public static CoprocessorFilter fromFilter(final IDimensionEncodingMap dimEncMap, TupleFilter rootFilter, FilterDecorator.FilterConstantsTreatment filterConstantsTreatment) { // translate constants into dictionary IDs via a serialize copy FilterDecorator filterDecorator = new FilterDecorator(dimEncMap, filterConstantsTreatment); byte[] bytes = TupleFilterSerializer.serialize(rootFilter, filterDecorator, DictCodeSystem.INSTANCE); @@ -44,8 +43,7 @@ public class CoprocessorFilter { } public static byte[] serialize(CoprocessorFilter o) { - return (o.filter == null) ? BytesUtil.EMPTY_BYTE_ARRAY - : TupleFilterSerializer.serialize(o.filter, DictCodeSystem.INSTANCE); + return (o.filter == null) ? BytesUtil.EMPTY_BYTE_ARRAY : TupleFilterSerializer.serialize(o.filter, DictCodeSystem.INSTANCE); } public static CoprocessorFilter deserialize(byte[] filterBytes) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java index 65215f6..f6332f4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java @@ -36,8 +36,7 @@ import org.apache.kylin.metadata.model.TblColRef; */ public class CoprocessorProjector { - public static CoprocessorProjector makeForObserver(final CubeSegment cubeSegment, final Cuboid cuboid, - final Collection<TblColRef> dimensionColumns) { + public static CoprocessorProjector makeForObserver(final CubeSegment cubeSegment, final Cuboid cuboid, final Collection<TblColRef> dimensionColumns) { RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) { @Override @@ -46,8 +45,7 @@ public class CoprocessorProjector { } @Override - protected void fillColumnValue(TblColRef column, int columnLen, String valueStr, byte[] outputValue, - int outputValueOffset) { + protected void fillColumnValue(TblColRef column, int columnLen, String valueStr, byte[] outputValue, int outputValueOffset) { byte bits = dimensionColumns.contains(column) ? (byte) 0xff : 0x00; Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, bits); } @@ -56,6 +54,7 @@ public class CoprocessorProjector { byte[] mask = rowKeyMaskEncoder.encode(new String[cuboid.getColumns().size()]); return new CoprocessorProjector(mask, dimensionColumns.size() != 0); } + public static byte[] serialize(CoprocessorProjector o) { ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);