Repository: kylin Updated Branches: refs/heads/KYLIN-2202 [created] 19184d2e6
KYLIN-2202 code review Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/19184d2e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/19184d2e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/19184d2e Branch: refs/heads/KYLIN-2202 Commit: 19184d2e67c5e48131bcc021dcc7307dc789f77a Parents: 873f903 Author: Li Yang <liy...@apache.org> Authored: Thu Nov 24 14:52:33 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Nov 24 14:52:33 2016 +0800 ---------------------------------------------------------------------- .../validation/rule/DictionaryRuleTest.java | 1 - .../apache/kylin/dict/DictionaryGenerator.java | 7 +- .../apache/kylin/dict/DictionaryManager.java | 4 - .../dict/NumberDictionaryForestBuilder.java | 4 - .../kylin/dict/TrieDictionaryForestBuilder.java | 3 +- .../apache/kylin/engine/mr/DFSFileTable.java | 35 --- .../engine/mr/DFSSingleFileTableReader.java | 218 ------------------- .../apache/kylin/engine/mr/SortedColumn.java | 100 --------- .../kylin/engine/mr/SortedColumnDFSFile.java | 129 +++++++++++ .../engine/mr/SortedColumnDFSFileReader.java | 136 ++++++++++++ .../kylin/engine/mr/SortedColumnReader.java | 136 ------------ .../engine/mr/steps/CreateDictionaryJob.java | 6 +- .../mr/steps/FactDistinctColumnsReducer.java | 3 - .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +- .../engine/mr/steps/SelfDefineSortableKey.java | 25 ++- .../apache/kylin/engine/mr/steps/TypeFlag.java | 28 --- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 4 +- .../kylin/engine/mr/SortedColumnReaderTest.java | 10 +- .../mr/steps/NumberDictionaryForestTest.java | 1 + .../mr/steps/SelfDefineSortableKeyTest.java | 93 ++++---- .../kylin/source/hive/HiveTableReader.java | 3 - 21 files changed, 342 insertions(+), 616 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java index 8bd4c88..9b37507 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java @@ -34,7 +34,6 @@ import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.dict.GlobalDictionaryBuilder; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java index 20a57ba..810a392 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java @@ -135,8 +135,7 @@ public class DictionaryGenerator { private static class StringDictBuilder implements IDictionaryBuilder { @Override public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { - int maxTrieSizeInMB = TrieDictionaryForestBuilder.getMaxTrieSizeInMB(); - TrieDictionaryForestBuilder builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId, maxTrieSizeInMB); + TrieDictionaryForestBuilder builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId); String value; while (valueEnumerator.moveNext()) { value = valueEnumerator.current(); @@ -153,9 +152,7 @@ public class DictionaryGenerator { private static class NumberDictBuilder implements IDictionaryBuilder { @Override public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { - - int maxTrieSizeInMB = TrieDictionaryForestBuilder.getMaxTrieSizeInMB(); - NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder(baseId, maxTrieSizeInMB); + NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder(baseId); String value; while (valueEnumerator.moveNext()) { value = valueEnumerator.current(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 0608e6f..c33cd28 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -442,10 +442,6 @@ public class DictionaryManager { logger.info("DictionaryManager(" + System.identityHashCode(this) + ") loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath); DictionaryInfo info = store.getResource(resourcePath, DictionaryInfo.class, loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : DictionaryInfoSerializer.INFO_SERIALIZER); - //info.dictionaryObject.dump(System.out); - // if (loadDictObj) - // logger.debug("Loaded dictionary at " + resourcePath); - return info; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java index a68b18e..5502a74 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java @@ -20,16 +20,12 @@ package org.apache.kylin.dict; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.dict.NumberDictionary.NumberBytesCodec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Created by xiefan on 16-11-2. */ public class NumberDictionaryForestBuilder extends TrieDictionaryForestBuilder<String> { - private static final Logger logger = LoggerFactory.getLogger(NumberDictionaryForestBuilder.class); - public static class Number2BytesConverter implements BytesConverter<String> { static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = NumberDictionary.MAX_DIGITS_BEFORE_DECIMAL_POINT; http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java index 6aeb2ec..4ee30f0 100755 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java @@ -78,7 +78,6 @@ public class TrieDictionaryForestBuilder<T> { addValue(valueBytes); } - private void addValue(byte[] valueBytes) { ByteArray valueByteArray = new ByteArray(valueBytes); if (previousValue != null && isOrdered) { @@ -97,7 +96,7 @@ public class TrieDictionaryForestBuilder<T> { previousValue = valueByteArray; trieBuilder.addValue(valueBytes); curTreeSize += valueBytes.length; - + if (curTreeSize >= maxTrieTreeSize && isOrdered) { TrieDictionary<T> tree = trieBuilder.build(0); addTree(tree); http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java index 074b271..8c1f6bd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java @@ -18,12 +18,10 @@ package org.apache.kylin.engine.mr; -import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -57,7 +55,6 @@ public class DFSFileTable implements ReadableTable { } @Override - @Deprecated public TableReader getReader() throws IOException { return new DFSFileTableReader(path, delim, nColumns); } @@ -72,29 +69,6 @@ public class DFSFileTable implements ReadableTable { } } - public Collection<TableReader> getReaders() { - ArrayList<TableReader> readers = new ArrayList<>(); - try { - String filePath = HadoopUtil.fixWindowsPath(path); - FileSystem fs = HadoopUtil.getFileSystem(filePath); - ArrayList<FileStatus> allFiles = new ArrayList<>(); - FileStatus status = fs.getFileStatus(new Path(filePath)); - if (status.isFile()) { - allFiles.add(status); - } else { - FileStatus[] listStatus = fs.listStatus(new Path(filePath)); - allFiles.addAll(Arrays.asList(listStatus)); - } - for (FileStatus f : allFiles) { - DFSSingleFileTableReader reader = new DFSSingleFileTableReader(f.getPath().toString(), delim, nColumns); - readers.add(reader); - } - }catch (IOException e){ - e.printStackTrace(); - } - return readers; - } - @Override public String toString() { return path; @@ -123,13 +97,4 @@ public class DFSFileTable implements ReadableTable { return Pair.newPair(size, lastModified); } - private boolean isExceptionSayingNotSeqFile(IOException e) { - if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile")) - return true; - - if (e instanceof EOFException) // in case the file is very very small - return true; - - return false; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java deleted file mode 100644 index 0a45188..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSSingleFileTableReader.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr; - -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.StringSplitter; -import org.apache.kylin.source.ReadableTable.TableReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tables are typically CSV or SEQ file. - * - * @author yangli9 - */ -public class DFSSingleFileTableReader implements TableReader { - - private static final Logger logger = LoggerFactory.getLogger(DFSSingleFileTableReader.class); - private static final char CSV_QUOTE = '"'; - private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," }; - - private String filePath; - private String delim; - private RowReader reader; - - private String curLine; - private String[] curColumns; - private int expectedColumnNumber = -1; // helps delimiter detection - - public DFSSingleFileTableReader(String filePath, int expectedColumnNumber) throws IOException { - this(filePath, DFSFileTable.DELIM_AUTO, expectedColumnNumber); - } - - public DFSSingleFileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException { - filePath = HadoopUtil.fixWindowsPath(filePath); - this.filePath = filePath; - this.delim = delim; - this.expectedColumnNumber = expectedColumnNumber; - - FileSystem fs = HadoopUtil.getFileSystem(filePath); - - try { - this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath); - - } catch (IOException e) { - if (isExceptionSayingNotSeqFile(e) == false) - throw e; - - this.reader = new CsvRowReader(fs, filePath); - } - } - - private boolean isExceptionSayingNotSeqFile(IOException e) { - if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile")) - return true; - - if (e instanceof EOFException) // in case the file is very very small - return true; - - return false; - } - - @Override - public boolean next() throws IOException { - curLine = reader.nextLine(); - curColumns = null; - return curLine != null; - } - - public String getLine() { - return curLine; - } - - @Override - public String[] getRow() { - if (curColumns == null) { - if (DFSFileTable.DELIM_AUTO.equals(delim)) - delim = autoDetectDelim(curLine); - - if (delim == null) - curColumns = new String[] { curLine }; - else - curColumns = split(curLine, delim); - } - return curColumns; - } - - private String[] split(String line, String delim) { - // FIXME CVS line should be parsed considering escapes - String[] str = StringSplitter.split(line, delim); - - // un-escape CSV - if (DFSFileTable.DELIM_COMMA.equals(delim)) { - for (int i = 0; i < str.length; i++) { - str[i] = unescapeCsv(str[i]); - } - } - - return str; - } - - private String unescapeCsv(String str) { - if (str == null || str.length() < 2) - return str; - - str = StringEscapeUtils.unescapeCsv(str); - - // unescapeCsv may not remove the outer most quotes - if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE) - str = str.substring(1, str.length() - 1); - - return str; - } - - @Override - public void close() throws IOException { - if (reader != null) - reader.close(); - } - - private String autoDetectDelim(String line) { - if (expectedColumnNumber > 0) { - for (String delim : DETECT_DELIMS) { - if (StringSplitter.split(line, delim).length == expectedColumnNumber) { - logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line); - return delim; - } - } - } - - logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath); - return null; - } - - // ============================================================================ - - private interface RowReader extends Closeable { - String nextLine() throws IOException; // return null on EOF - } - - private class SeqRowReader implements RowReader { - Reader reader; - Writable key; - Text value; - - SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException { - reader = new Reader(hconf, Reader.file(new Path(path))); - key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf); - value = new Text(); - } - - @Override - public String nextLine() throws IOException { - boolean hasNext = reader.next(key, value); - if (hasNext) - return Bytes.toString(value.getBytes(), 0, value.getLength()); - else - return null; - } - - @Override - public void close() throws IOException { - reader.close(); - } - } - - private class CsvRowReader implements RowReader { - BufferedReader reader; - - CsvRowReader(FileSystem fs, String path) throws IOException { - FSDataInputStream in = fs.open(new Path(path)); - reader = new BufferedReader(new InputStreamReader(in, "UTF-8")); - } - - @Override - public String nextLine() throws IOException { - return reader.readLine(); - } - - @Override - public void close() throws IOException { - reader.close(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java deleted file mode 100644 index 46b13f4..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumn.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.mr; - -import org.apache.kylin.dict.ByteComparator; -import org.apache.kylin.dict.StringBytesConverter; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.source.ReadableTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Comparator; - -/** - * Created by xiefan on 16-11-22. - * - * Read values from multi col files and ensure their order using a K-Way merge algorithm - * - * You need to ensure that values inside each file is sorted - */ -public class SortedColumn implements ReadableTable { - - private DFSFileTable dfsFileTable; - - private String path; - - private DataType dataType; - - private static final Logger logger = LoggerFactory.getLogger(SortedColumn.class); - - public SortedColumn(String path, DataType dataType) { - this.dfsFileTable = new DFSFileTable(path, -1); - this.dataType = dataType; - } - - @Override - public TableReader getReader() throws IOException { - final Comparator<String> comparator = getComparatorByType(dataType); - return new SortedColumnReader(dfsFileTable.getReaders(), comparator); - } - - @Override - public TableSignature getSignature() throws IOException { - return dfsFileTable.getSignature(); - } - - private Comparator<String> getComparatorByType(DataType type) { - Comparator<String> comparator; - if (!type.isNumberFamily()) { - comparator = new ByteComparator<>(new StringBytesConverter()); - } else if (type.isIntegerFamily()) { - comparator = new Comparator<String>() { - @Override - public int compare(String str1, String str2) { - try { - Long num1 = Long.parseLong(str1); - Long num2 = Long.parseLong(str2); - return num1.compareTo(num2); - } catch (NumberFormatException e) { - logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2); - e.printStackTrace(); - return 0; - } - } - }; - } else { - comparator = new Comparator<String>() { - @Override - public int compare(String str1, String str2) { - try { - Double num1 = Double.parseDouble(str1); - Double num2 = Double.parseDouble(str2); - return num1.compareTo(num2); - } catch (NumberFormatException e) { - logger.error("NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2); - return 0; - } - } - }; - } - return comparator; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java new file mode 100644 index 0000000..327461b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.mr; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.dict.ByteComparator; +import org.apache.kylin.dict.StringBytesConverter; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.source.ReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by xiefan on 16-11-22. + * + * Read values from multi col files and ensure their order using a K-Way merge algorithm + * + * You need to ensure that values inside each file is sorted + */ +public class SortedColumnDFSFile implements ReadableTable { + + private static final Logger logger = LoggerFactory.getLogger(SortedColumnDFSFile.class); + + private String dfsPath; + + private DFSFileTable dfsFileTable; + + private DataType dataType; + + public SortedColumnDFSFile(String path, DataType dataType) { + this.dfsPath = path; + this.dfsFileTable = new DFSFileTable(path, -1); + this.dataType = dataType; + } + + @Override + public TableReader getReader() throws IOException { + final Comparator<String> comparator = getComparatorByType(dataType); + + ArrayList<TableReader> readers = new ArrayList<>(); + try { + String filePath = HadoopUtil.fixWindowsPath(dfsPath); + FileSystem fs = HadoopUtil.getFileSystem(filePath); + ArrayList<FileStatus> allFiles = new ArrayList<>(); + FileStatus status = fs.getFileStatus(new Path(filePath)); + if (status.isFile()) { + allFiles.add(status); + } else { + FileStatus[] listStatus = fs.listStatus(new Path(filePath)); + for (FileStatus f : listStatus) { + if (f.isFile()) + allFiles.add(f); + } + } + for (FileStatus f : allFiles) { + DFSFileTableReader reader = new DFSFileTableReader(f.getPath().toString(), -1); + readers.add(reader); + } + } catch (IOException e) { + e.printStackTrace(); + } + + return new SortedColumnDFSFileReader(readers, comparator); + } + + @Override + public TableSignature getSignature() throws IOException { + return dfsFileTable.getSignature(); + } + + private Comparator<String> getComparatorByType(DataType type) { + Comparator<String> comparator; + if (!type.isNumberFamily()) { + comparator = new ByteComparator<>(new StringBytesConverter()); + } else if (type.isIntegerFamily()) { + comparator = new Comparator<String>() { + @Override + public int compare(String str1, String str2) { + try { + Long num1 = Long.parseLong(str1); + Long num2 = Long.parseLong(str2); + return num1.compareTo(num2); + } catch (NumberFormatException e) { + logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2); + e.printStackTrace(); + return 0; + } + } + }; + } else { + comparator = new Comparator<String>() { + @Override + public int compare(String str1, String str2) { + try { + Double num1 = Double.parseDouble(str1); + Double num2 = Double.parseDouble(str2); + return num1.compareTo(num2); + } catch (NumberFormatException e) { + logger.error("NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2); + return 0; + } + } + }; + } + return comparator; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java new file mode 100644 index 0000000..77719ff --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.mr; + +import org.apache.kylin.source.ReadableTable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Comparator; +import java.util.PriorityQueue; + +/** + * Created by xiefan on 16-11-22. + */ +public class SortedColumnDFSFileReader implements ReadableTable.TableReader { + private Collection<ReadableTable.TableReader> readers; + + @SuppressWarnings("unused") + private Comparator<String> comparator; + + private PriorityQueue<ReaderBuffer> pq; + + private String[] row; + + public SortedColumnDFSFileReader(Collection<ReadableTable.TableReader> readers, final Comparator<String> comparator) { + this.readers = readers; + this.comparator = comparator; + pq = new PriorityQueue<ReaderBuffer>(11, new Comparator<ReaderBuffer>() { + @Override + public int compare(ReaderBuffer i, ReaderBuffer j) { + boolean isEmpty1 = i.empty(); + boolean isEmpty2 = j.empty(); + if (isEmpty1 && isEmpty2) + return 0; + if (isEmpty1 && !isEmpty2) + return 1; + if (!isEmpty1 && isEmpty2) + return -1; + return comparator.compare(i.peek()[0], j.peek()[0]); + } + }); + for (ReadableTable.TableReader reader : readers) { + if (reader != null) { + try { + pq.add(new ReaderBuffer(reader)); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public boolean next() throws IOException { + while (pq.size() > 0) { + ReaderBuffer buffer = pq.poll(); + String[] minEntry = buffer.pop(); + this.row = minEntry; + if (buffer.empty()) { + pq.remove(buffer); + } else { + pq.add(buffer); // add it back + } + if (this.row == null) { //avoid the case of empty file + return false; + } + return true; + } + return false; + } + + @Override + public String[] getRow() { + return this.row; + } + + @Override + public void close() throws IOException { + for (ReadableTable.TableReader reader : readers) + reader.close(); + } + + static class ReaderBuffer { + private ReadableTable.TableReader reader; + + private String[] row; + + public ReaderBuffer(ReadableTable.TableReader reader) throws IOException { + this.reader = reader; + reload(); + } + + public void close() throws IOException { + if (this.reader != null) + reader.close(); + } + + public boolean empty() { + return (this.row == null); + } + + public String[] peek() { + return this.row; + } + + public String[] pop() throws IOException { + String[] result = this.row; + reload(); + return result; + } + + private void reload() throws IOException { + if (reader.next()) { + row = reader.getRow(); + } else { + this.row = null; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java deleted file mode 100644 index 215198f..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnReader.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.mr; - -import org.apache.kylin.source.ReadableTable; - -import java.io.IOException; -import java.util.Collection; -import java.util.Comparator; -import java.util.PriorityQueue; - -/** - * Created by xiefan on 16-11-22. - */ -public class SortedColumnReader implements ReadableTable.TableReader { - private Collection<ReadableTable.TableReader> readers; - - private Comparator<String> comparator; - - private PriorityQueue<ReaderBuffer> pq; - - private String[] row; - - public SortedColumnReader(Collection<ReadableTable.TableReader> readers, final Comparator<String> comparator) { - this.readers = readers; - this.comparator = comparator; - pq = new PriorityQueue<ReaderBuffer>(11, new Comparator<ReaderBuffer>() { - @Override - public int compare(ReaderBuffer i, ReaderBuffer j) { - boolean isEmpty1 = i.empty(); - boolean isEmpty2 = j.empty(); - if (isEmpty1 && isEmpty2) - return 0; - if (isEmpty1 && !isEmpty2) - return 1; - if (!isEmpty1 && isEmpty2) - return -1; - return comparator.compare(i.peek()[0], j.peek()[0]); - } - }); - for (ReadableTable.TableReader reader : readers) { - if (reader != null) { - try { - pq.add(new ReaderBuffer(reader)); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - @Override - public boolean next() throws IOException { - while (pq.size() > 0) { - ReaderBuffer buffer = pq.poll(); - String[] minEntry = buffer.pop(); - this.row = minEntry; - if (buffer.empty()) { - pq.remove(buffer); - } else { - pq.add(buffer); // add it back - } - if (this.row == null) { //avoid the case of empty file - return false; - } - return true; - } - return false; - } - - @Override - public String[] getRow() { - return this.row; - } - - @Override - public void close() throws IOException { - for (ReadableTable.TableReader reader : readers) - reader.close(); - } - - static class ReaderBuffer { - public ReaderBuffer(ReadableTable.TableReader reader) throws IOException { - this.reader = reader; - reload(); - } - - public void close() throws IOException { - if (this.reader != null) - reader.close(); - } - - public boolean empty() { - return (this.row == null); - } - - public String[] peek() { - return this.row; - } - - public String[] pop() throws IOException { - String[] result = this.row; - reload(); - return result; - } - - private void reload() throws IOException { - if (reader.next()) { - row = reader.getRow(); - } else { - this.row = null; - } - } - - private ReadableTable.TableReader reader; - - private String[] row; - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 7447133..5d7cb21 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -23,14 +23,13 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.cli.DictionaryGeneratorCLI; import org.apache.kylin.dict.DistinctColumnValuesProvider; -import org.apache.kylin.engine.mr.SortedColumn; +import org.apache.kylin.engine.mr.SortedColumnDFSFile; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; /** * @author ysong1 - * */ public class CreateDictionaryJob extends AbstractHadoopJob { @@ -54,8 +53,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob { DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { @Override public ReadableTable getDistinctValuesFor(TblColRef col) { - //return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1); - return new SortedColumn(factColumnsInputPath + "/" + col.getName(), col.getType()); + return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getName(), col.getType()); } }); http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 9722b9c..6e24d61 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -66,7 +66,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private List<ByteArray> colValues; private TblColRef col = null; private boolean isStatistics = false; - private boolean isPartitionCol = false; private KylinConfig cubeConfig; private int uhcReducerCount; private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>(); @@ -103,13 +102,11 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } else if (collectStatistics && (taskId == numberOfTasks - 2)) { // partition col isStatistics = false; - isPartitionCol = true; col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); colValues = Lists.newLinkedList(); } else { // col isStatistics = false; - isPartitionCol = false; col = columnList.get(ReducerIdToColumnIndex.get(taskId)); colValues = Lists.newLinkedList(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 7776172..762047b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -30,13 +30,12 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.TblColRef; /** */ @@ -156,14 +155,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); sortableKey.setText(outputKey); //judge type - DataType type = factDictCols.get(i).getType(); - if (!type.isNumberFamily()) { - sortableKey.setTypeId((byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal()); - } else if (type.isIntegerFamily()) { - sortableKey.setTypeId((byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); - } else { - sortableKey.setTypeId((byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); - } + sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType()); context.write(sortableKey, EMPTY_TEXT); } } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java index 4ca3a90..b804eef 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKey.java @@ -17,9 +17,9 @@ */ package org.apache.kylin.engine.mr.steps; - import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +import org.apache.kylin.metadata.datatype.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,11 +27,15 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; - /** * Created by xiefan on 16-11-1. */ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSortableKey> { + public enum TypeFlag { + NONE_NUMERIC_TYPE, + INTEGER_FAMILY_TYPE, + DOUBLE_FAMILY_TYPE + } private byte typeId; //non-numeric(0000 0000) int(0000 0001) other numberic(0000 0010) @@ -61,7 +65,7 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta logger.error("none numeric value!"); return 0; } - if (o.isIntegerFamily()) { //integer type + if (o.isIntegerFamily()) { //integer type try { Long num1 = Long.parseLong(str1); Long num2 = Long.parseLong(str2); @@ -71,7 +75,7 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta e.printStackTrace(); return 0; } - } else { //other numeric type + } else { //other numeric type try { Double num1 = Double.parseDouble(str1); Double num2 = Double.parseDouble(str2); @@ -106,7 +110,8 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta } public boolean isNumberFamily() { - if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal()) return false; + if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal()) + return false; return true; } @@ -118,6 +123,16 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta return (typeId == TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); } + public void setTypeIdByDatatype(DataType type) { + if (!type.isNumberFamily()) { + this.typeId = (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal(); + } else if (type.isIntegerFamily()) { + this.typeId = (byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal(); + } else { + this.typeId = (byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal(); + } + } + public void setTypeId(byte typeId) { this.typeId = typeId; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java deleted file mode 100644 index 3279106..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/TypeFlag.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -/** - * Created by xiefan on 16-11-2. - */ -public enum TypeFlag { - NONE_NUMERIC_TYPE, - INTEGER_FAMILY_TYPE, - DOUBLE_FAMILY_TYPE -} http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 6e18f9b..eb06f07 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -29,7 +29,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.SortedColumn; +import org.apache.kylin.engine.mr.SortedColumnDFSFile; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -101,7 +101,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); //final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); - final ReadableTable readableTable = new SortedColumn(factDistinctPath + "/" + partitionCol.getName(), partitionCol.getType()); + final ReadableTable readableTable = new SortedColumnDFSFile(factDistinctPath + "/" + partitionCol.getName(), partitionCol.getType()); final ReadableTable.TableReader tableReader = readableTable.getReader(); long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE; try { http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java index 2f2170c..6a8937f 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java @@ -55,7 +55,7 @@ public class SortedColumnReaderTest { StringBytesConverter converter = new StringBytesConverter(); ArrayList<String> correctAnswer = readAllFiles(dirPath); Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter())); - SortedColumn column = new SortedColumn(dirPath + "/",DataType.getType("varchar")); + SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/",DataType.getType("varchar")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(),-1); ArrayList<String> output = new ArrayList<>(); while(e.moveNext()){ @@ -130,7 +130,7 @@ public class SortedColumnReaderTest { } } }); - SortedColumn column = new SortedColumn(dirPath + "/",DataType.getType("long")); + SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/",DataType.getType("long")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(),-1); ArrayList<String> output = new ArrayList<>(); while(e.moveNext()){ @@ -147,7 +147,7 @@ public class SortedColumnReaderTest { @Test public void testEmptyDir() throws Exception{ String dirPath = "src/test/resources/empty_dir"; - SortedColumn column = new SortedColumn(dirPath + "/",DataType.getType("varchar")); + SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/",DataType.getType("varchar")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(),-1); ArrayList<String> output = new ArrayList<>(); while(e.moveNext()){ @@ -164,7 +164,7 @@ public class SortedColumnReaderTest { final BytesConverter<String> converter = new StringBytesConverter(); Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter())); System.out.println("correct answer:"+correctAnswer); - SortedColumn column = new SortedColumn(dirPath + "/",DataType.getType("varchar")); + SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/",DataType.getType("varchar")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(),-1); ArrayList<String> output = new ArrayList<>(); while(e.moveNext()){ @@ -237,7 +237,7 @@ public class SortedColumnReaderTest { } } }); - SortedColumn column = new SortedColumn(dirPath + "/",DataType.getType("double")); + SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/",DataType.getType("double")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(),-1); ArrayList<String> output = new ArrayList<>(); while(e.moveNext()){ http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java index 677e386..12105c9 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java @@ -23,6 +23,7 @@ import org.apache.kylin.dict.NumberDictionaryBuilder; import org.apache.kylin.dict.NumberDictionaryForestBuilder; import org.apache.kylin.dict.StringBytesConverter; import org.apache.kylin.dict.TrieDictionaryForest; +import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey.TypeFlag; import org.junit.Ignore; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java index b03514c..283a0f5 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java @@ -12,6 +12,7 @@ import java.util.UUID; import org.apache.hadoop.io.Text; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey.TypeFlag; import org.junit.Test; /** @@ -20,11 +21,11 @@ import org.junit.Test; public class SelfDefineSortableKeyTest { @Test - public void testSortLong(){ + public void testSortLong() { Random rand = new Random(System.currentTimeMillis()); ArrayList<Long> longList = new ArrayList<>(); int count = 10; - for(int i=0;i<count;i++){ + for (int i = 0; i < count; i++) { longList.add(rand.nextLong()); } longList.add(0L); @@ -33,14 +34,14 @@ public class SelfDefineSortableKeyTest { longList.add(Long.MAX_VALUE); longList.add(Long.MIN_VALUE); - System.out.println("test numbers:"+longList); + System.out.println("test numbers:" + longList); ArrayList<String> strNumList = listToStringList(longList); //System.out.println("test num strs list:"+strNumList); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); System.out.println(keyList.get(0).isIntegerFamily()); Collections.sort(keyList); ArrayList<String> strListAftereSort = new ArrayList<>(); - for(SelfDefineSortableKey key : keyList){ + for (SelfDefineSortableKey key : keyList) { String str = printKey(key); strListAftereSort.add(str); } @@ -55,11 +56,11 @@ public class SelfDefineSortableKeyTest { } @Test - public void testSortDouble(){ + public void testSortDouble() { Random rand = new Random(System.currentTimeMillis()); ArrayList<Double> doubleList = new ArrayList<>(); int count = 10; - for(int i=0;i<count;i++){ + for (int i = 0; i < count; i++) { doubleList.add(rand.nextDouble()); } doubleList.add(0.0); @@ -69,14 +70,14 @@ public class SelfDefineSortableKeyTest { doubleList.add(-Double.MAX_VALUE); //System.out.println(Double.MIN_VALUE); - System.out.println("test numbers:"+doubleList); + System.out.println("test numbers:" + doubleList); ArrayList<String> strNumList = listToStringList(doubleList); //System.out.println("test num strs list:"+strNumList); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); System.out.println(keyList.get(0).isOtherNumericFamily()); Collections.sort(keyList); ArrayList<String> strListAftereSort = new ArrayList<>(); - for(SelfDefineSortableKey key : keyList){ + for (SelfDefineSortableKey key : keyList) { String str = printKey(key); strListAftereSort.add(str); } @@ -91,10 +92,10 @@ public class SelfDefineSortableKeyTest { } @Test - public void testSortNormalString(){ + public void testSortNormalString() { int count = 10; ArrayList<String> strList = new ArrayList<>(); - for(int i=0;i<count;i++){ + for (int i = 0; i < count; i++) { UUID uuid = UUID.randomUUID(); strList.add(uuid.toString()); } @@ -102,28 +103,28 @@ public class SelfDefineSortableKeyTest { strList.add("hello"); //duplicate strList.add("123"); strList.add(""); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, (byte)TypeFlag.NONE_NUMERIC_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal()); System.out.println(keyList.get(0).isOtherNumericFamily()); Collections.sort(keyList); ArrayList<String> strListAftereSort = new ArrayList<>(); - for(SelfDefineSortableKey key : keyList){ + for (SelfDefineSortableKey key : keyList) { String str = printKey(key); strListAftereSort.add(str); } assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() { @Override public int compare(String o1, String o2) { - return o1.compareTo(o2); + return o1.compareTo(o2); } })); } @Test - public void testIllegalNumber(){ + public void testIllegalNumber() { Random rand = new Random(System.currentTimeMillis()); ArrayList<Double> doubleList = new ArrayList<>(); int count = 10; - for(int i=0;i<count;i++){ + for (int i = 0; i < count; i++) { doubleList.add(rand.nextDouble()); } doubleList.add(0.0); @@ -133,84 +134,74 @@ public class SelfDefineSortableKeyTest { doubleList.add(-Double.MAX_VALUE); //System.out.println(Double.MIN_VALUE); - System.out.println("test numbers:"+doubleList); + System.out.println("test numbers:" + doubleList); ArrayList<String> strNumList = listToStringList(doubleList); strNumList.add("fjaeif"); //illegal type //System.out.println("test num strs list:"+strNumList); - ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); System.out.println(keyList.get(0).isOtherNumericFamily()); Collections.sort(keyList); - for(SelfDefineSortableKey key : keyList){ + for (SelfDefineSortableKey key : keyList) { printKey(key); } } @Test - public void testEnum(){ + public void testEnum() { TypeFlag flag = TypeFlag.DOUBLE_FAMILY_TYPE; - System.out.println((byte)flag.ordinal()); - int t = (byte)flag.ordinal(); + System.out.println((byte) flag.ordinal()); + int t = (byte) flag.ordinal(); System.out.println(t); } - - - private<T> ArrayList<String> listToStringList(ArrayList<T> list){ + private <T> ArrayList<String> listToStringList(ArrayList<T> list) { ArrayList<String> strList = new ArrayList<>(); - for(T t : list){ + for (T t : list) { System.out.println(t.toString()); strList.add(t.toString()); } return strList; } - private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList,byte typeFlag){ + private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList, byte typeFlag) { int partationId = 0; ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>(); - for(String str : strNumList){ + for (String str : strNumList) { ByteBuffer keyBuffer = ByteBuffer.allocate(4096); int offset = keyBuffer.position(); keyBuffer.put(Bytes.toBytes(partationId)[3]); keyBuffer.put(Bytes.toBytes(str)); - //System.out.println(Arrays.toString(keyBuffer.array())); - byte[] valueField = Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1); - //System.out.println("new string:"+new String(valueField)); - //System.out.println("arrays toString:"+Arrays.toString(valueField)); + Bytes.copy(keyBuffer.array(), 1, keyBuffer.position() - offset - 1); Text outputKey = new Text(); - outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset); - SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag,outputKey); + outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); + SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag, outputKey); keyList.add(sortableKey); } return keyList; } - private String printKey(SelfDefineSortableKey key){ - byte[] data = key.getText().getBytes(); - byte[] fieldValue = Bytes.copy(data,1,data.length-1); - System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new String(fieldValue)); - return new String(fieldValue); - } - - private String getFieldValue(SelfDefineSortableKey key){ + private String printKey(SelfDefineSortableKey key) { byte[] data = key.getText().getBytes(); - byte[] fieldValue = Bytes.copy(data,1,data.length-1); + byte[] fieldValue = Bytes.copy(data, 1, data.length - 1); + System.out.println("type flag:" + key.getTypeId() + " fieldValue:" + new String(fieldValue)); return new String(fieldValue); } - private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){ + private <T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp) { int flag; T previous = null; - for(T t : list){ - if(previous == null) previous = t; - else{ - flag = comp.compare(previous,t); - if(flag > 0) return false; + for (T t : list) { + if (previous == null) + previous = t; + else { + flag = comp.compare(previous, t); + if (flag > 0) + return false; previous = t; } } return true; } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/19184d2e/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java index 3f9ce01..8309a8c 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java @@ -82,9 +82,6 @@ public class HiveTableReader implements TableReader { } this.numberOfSplits = readCntxt.numSplits(); - - // HCatTableInfo tableInfo = HCatTableInfo. - // HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration); } @Override