KYLIN-2135 minor format update Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dd496a69 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dd496a69 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dd496a69
Branch: refs/heads/KYLIN-2006 Commit: dd496a6945ba20192c2fa2bda845c055357ab44a Parents: 7421403 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Nov 3 18:49:50 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Nov 8 21:29:22 2016 +0800 ---------------------------------------------------------------------- .../kylin/engine/mr/DFSFileTableReader.java | 92 ++++++++++---------- .../engine/mr/steps/FactDistinctColumnsJob.java | 34 ++++---- 2 files changed, 61 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/dd496a69/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index dda1d6f..173c908 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -23,14 +23,15 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader { private String filePath; private String delim; - private List<RowReader> readerList; + private List<RowReader> readerList; private String curLine; private String[] curColumns; @@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader { this.filePath = filePath; this.delim = delim; this.expectedColumnNumber = expectedColumnNumber; - this.readerList = new ArrayList<RowReader>(); + this.readerList = new ArrayList<RowReader>(); FileSystem fs = HadoopUtil.getFileSystem(filePath); - ArrayList<FileStatus> allFiles = new ArrayList<>(); - FileStatus status = fs.getFileStatus(new Path(filePath)); - if (status.isFile()) { - allFiles.add(status); - } else { - FileStatus[] listStatus = fs.listStatus(new Path(filePath)); - allFiles.addAll(Arrays.asList(listStatus)); - } - - try { - for (FileStatus f : allFiles) { - RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); - this.readerList.add(rowReader); - } + ArrayList<FileStatus> allFiles = new ArrayList<>(); + FileStatus status = fs.getFileStatus(new Path(filePath)); + if (status.isFile()) { + allFiles.add(status); + } else { + FileStatus[] listStatus = fs.listStatus(new Path(filePath)); + allFiles.addAll(Arrays.asList(listStatus)); + } + + try { + for (FileStatus f : allFiles) { + RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); + this.readerList.add(rowReader); + } } catch (IOException e) { if (isExceptionSayingNotSeqFile(e) == false) throw e; - this.readerList = new ArrayList<RowReader>(); - for (FileStatus f : allFiles) { - RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); - this.readerList.add(rowReader); - } + this.readerList = new ArrayList<RowReader>(); + for (FileStatus f : allFiles) { + RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); + this.readerList.add(rowReader); + } } } @@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader { @Override public boolean next() throws IOException { - int curReaderIndex = -1; - RowReader curReader; - - while (++curReaderIndex < readerList.size()) { - curReader = readerList.get(curReaderIndex); - curLine = curReader.nextLine(); - curColumns = null; - - if (curLine != null) { - return true; - } - } - - return false; + int curReaderIndex = -1; + RowReader curReader; + + while (++curReaderIndex < readerList.size()) { + curReader = readerList.get(curReaderIndex); + curLine = curReader.nextLine(); + curColumns = null; + + if (curLine != null) { + return true; + } + } + + return false; } public String getLine() { @@ -176,15 +177,10 @@ public class DFSFileTableReader implements TableReader { } @Override - public void close() { - for (RowReader reader : readerList) { - try { - if (reader != null) - reader.close(); - } catch (IOException e) { - logger.warn("close file failed:", e); - } - } + public void close() { + for (RowReader reader : readerList) { + IOUtils.closeQuietly(reader); + } } private String autoDetectDelim(String line) { http://git-wip-us.apache.org/repos/asf/kylin/blob/dd496a69/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 92da7d1..551ce33 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -78,27 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); - int reducerCount = columnsNeedDict.size(); - int uhcReducerCount = cube.getConfig().getUHCReducerCount(); - - int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor()); - for(int index : uhcIndex) { - if(index == 1) { - reducerCount += uhcReducerCount - 1; - } - } - - if (reducerCount > 255) { - throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' "); - } - - + int reducerCount = columnsNeedDict.size(); + int uhcReducerCount = cube.getConfig().getUHCReducerCount(); + + int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor()); + for(int index : uhcIndex) { + if(index == 1) { + reducerCount += uhcReducerCount - 1; + } + } + + if (reducerCount > 255) { + throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.job.uhc.reducer.count'"); + } + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); - + logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); @@ -114,7 +114,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { logger.info("Found segment: " + segment); } setupMapper(cube.getSegmentById(segmentID)); - setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); + setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); attachKylinPropsAndMetadata(cube, job.getConfiguration());