KYLIN-2640 refactor ISource API
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a0edfef Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a0edfef Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a0edfef Branch: refs/heads/KYLIN-2606 Commit: 0a0edfefb96689ba06bb89a657d7c65711894ebb Parents: 4766c78 Author: Li Yang <liy...@apache.org> Authored: Tue May 23 14:55:09 2017 +0800 Committer: hongbin ma <m...@kyligence.io> Committed: Tue May 23 17:30:18 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../java/org/apache/kylin/cube/CubeManager.java | 24 +- .../java/org/apache/kylin/cube/CubeSegment.java | 14 +- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 8 +- .../org/apache/kylin/cube/util/CubingUtils.java | 4 +- .../org/apache/kylin/dict/DictionaryInfo.java | 2 +- .../apache/kylin/dict/DictionaryManager.java | 14 +- .../dict/DistinctColumnValuesProvider.java | 4 +- .../kylin/dict/TableColumnValueEnumerator.java | 6 +- .../dict/TableColumnValueSortedEnumerator.java | 14 +- .../kylin/dict/lookup/LookupStringTable.java | 4 +- .../apache/kylin/dict/lookup/LookupTable.java | 8 +- .../kylin/dict/lookup/SnapshotManager.java | 8 +- .../apache/kylin/dict/lookup/SnapshotTable.java | 8 +- .../apache/kylin/dict/MockupReadableTable.java | 8 +- .../org/apache/kylin/source/IReadableTable.java | 146 ++++++++++++ .../java/org/apache/kylin/source/ISource.java | 29 ++- .../kylin/source/ISourceMetadataExplorer.java | 36 +++ .../org/apache/kylin/source/ReadableTable.java | 145 ------------ .../org/apache/kylin/source/SourceFactory.java | 15 +- .../apache/kylin/source/SourcePartition.java | 8 + .../storage/gtrecord/CubeTupleConverter.java | 4 +- .../apache/kylin/engine/mr/DFSFileTable.java | 4 +- .../kylin/engine/mr/DFSFileTableReader.java | 2 +- .../org/apache/kylin/engine/mr/IMRInput.java | 6 +- .../kylin/engine/mr/SortedColumnDFSFile.java | 4 +- .../engine/mr/SortedColumnDFSFileReader.java | 16 +- .../engine/mr/steps/CreateDictionaryJob.java | 4 +- .../engine/mr/steps/MergeCuboidMapperTest.java | 2 +- .../engine/spark/KylinKryoRegistrator.java | 2 +- .../kylin/cube/ITDictionaryManagerTest.java | 4 +- .../kylin/provision/BuildCubeWithEngine.java | 4 +- .../kylin/provision/BuildCubeWithStream.java | 4 +- .../hive/ITHiveSourceTableLoaderTest.java | 29 ++- .../source/hive/ITSnapshotManagerTest.java | 6 +- server-base/pom.xml | 2 + .../kylin/rest/controller/CubeController.java | 2 +- .../kylin/rest/controller/TableController.java | 2 +- .../rest/controller2/TableControllerV2.java | 8 +- .../kylin/rest/job/StorageCleanupJob.java | 8 +- .../apache/kylin/rest/service/JobService.java | 4 +- .../rest/service/TableSchemaUpdateChecker.java | 207 ++++++++++++++++ .../apache/kylin/rest/service/TableService.java | 105 +++++++-- .../kylin/rest/service/TableServiceV2.java | 12 +- .../kylin/source/hive/HiveMetadataExplorer.java | 116 +++++++++ .../apache/kylin/source/hive/HiveSource.java | 38 +-- .../source/hive/HiveSourceTableLoader.java | 147 ------------ .../org/apache/kylin/source/hive/HiveTable.java | 4 +- .../kylin/source/hive/HiveTableReader.java | 2 +- .../apache/kylin/source/hive/SchemaChecker.java | 234 ------------------- .../apache/kylin/source/kafka/KafkaSource.java | 46 +++- 51 files changed, 816 insertions(+), 721 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8930129..abe142a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -474,6 +474,10 @@ abstract public class KylinConfigBase implements Serializable { // SOURCE.HIVE // ============================================================================ + public int getDefaultSource() { + return Integer.parseInt(getOptional("kylin.source.default", "0")); + } + public Map<Integer, String> getSourceEngines() { Map<Integer, String> r = Maps.newLinkedHashMap(); // ref constants in ISourceAware http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 0a94fb2..e6cd761 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -66,7 +66,7 @@ import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.metadata.realization.IRealizationProvider; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.SourcePartition; import org.slf4j.Logger; @@ -214,7 +214,7 @@ public class CubeManager implements IRealizationProvider { return result; } - public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable inpTable) throws IOException { + public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable) throws IOException { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) return null; @@ -226,7 +226,7 @@ public class CubeManager implements IRealizationProvider { return dictInfo; } - public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable inpTable, Dictionary<String> dict) throws IOException { + public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable, Dictionary<String> dict) throws IOException { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) return null; @@ -281,7 +281,7 @@ public class CubeManager implements IRealizationProvider { tableDesc.setName(tableName); } - ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); @@ -459,8 +459,22 @@ public class CubeManager implements IRealizationProvider { return appendSegment(cube, sourcePartition.getStartDate(), sourcePartition.getEndDate(), sourcePartition.getStartOffset(), sourcePartition.getEndOffset(), sourcePartition.getSourcePartitionOffsetStart(), sourcePartition.getSourcePartitionOffsetEnd()); } - public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException { + CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException { checkBuildingSegment(cube); + + // fix start/end a bit + if (cube.getModel().getPartitionDesc().isPartitioned()) { + // if missing start, set it to where last time ends + if (startDate == 0 && startOffset == 0 && cube.getLastSegment() != null) { + CubeSegment last = cube.getLastSegment(); + startDate = last.isSourceOffsetsOn() ? 0 : last.getDateRangeEnd(); + startOffset = last.isSourceOffsetsOn() ? last.getSourceOffsetEnd() : 0; + } + } else { + // full build + startDate = startOffset = endOffset = 0; + endDate = Long.MAX_VALUE; + } CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 45310f0..1b28bd8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -126,11 +126,13 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen * returns "yyyyMMddHHmmss_yyyyMMddHHmmss" */ public static String makeSegmentName(long startDate, long endDate, long startOffset, long endOffset) { - if (startOffset != 0 || endOffset != 0) { - if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) { - return "FULL_BUILD"; - } + if (startOffset == 0 && startDate == 0 // + && (endOffset == 0 || endOffset == Long.MAX_VALUE) // + && (endDate == 0 || endDate == Long.MAX_VALUE)) { + return "FULL_BUILD"; + } + if (startOffset != 0 || endOffset != 0) { return startOffset + "_" + endOffset; } @@ -296,13 +298,13 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen String r; String dictKey = col.getIdentity(); r = getDictionaries().get(dictKey); - + // try Kylin v1.x dict key as well if (r == null) { String v1DictKey = col.getTable() + "/" + col.getName(); r = getDictionaries().get(v1DictKey); } - + return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index e8c53f7..e7368e8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -36,7 +36,7 @@ import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +60,7 @@ public class DictionaryGeneratorCLI { // dictionary for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) { logger.info("Building dictionary for " + col); - ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider); + IReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider); if (dictProvider != null) { Dictionary<String> dict = dictProvider.getDictionary(col); if (dict != null) { @@ -99,13 +99,13 @@ public class DictionaryGeneratorCLI { } } - private static ReadableTable decideInputTable(DataModelDesc model, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) { + private static IReadableTable decideInputTable(DataModelDesc model, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) { KylinConfig config = model.getConfig(); DictionaryManager dictMgr = DictionaryManager.getInstance(config); TblColRef srcCol = dictMgr.decideSourceData(model, col); String srcTable = srcCol.getTable(); - ReadableTable inpTable; + IReadableTable inpTable; if (model.isFactTable(srcTable)) { inpTable = factTableValueProvider.getDistinctValuesFor(srcCol); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index 5e63f94..b1b6bce 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -41,7 +41,7 @@ import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +166,7 @@ public class CubingUtils { for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) { final TblColRef tblColRef = entry.getKey(); final Dictionary<String> dictionary = entry.getValue(); - ReadableTable.TableSignature signature = new ReadableTable.TableSignature(); + IReadableTable.TableSignature signature = new IReadableTable.TableSignature(); signature.setLastModifiedTime(System.currentTimeMillis()); signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset)); signature.setSize(endOffset - startOffset); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java index a85628d..ae5c0f1 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java @@ -22,7 +22,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.source.ReadableTable.TableSignature; +import org.apache.kylin.source.IReadableTable.TableSignature; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 4c4e334..bbe6cac 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -41,8 +41,8 @@ import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableSignature; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.IReadableTable.TableSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,11 +273,11 @@ public class DictionaryManager { } } - public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable) throws IOException { + public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, IReadableTable inpTable) throws IOException { return buildDictionary(model, col, inpTable, null); } - public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, String builderClass) throws IOException { + public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, IReadableTable inpTable, String builderClass) throws IOException { if (inpTable.exists() == false) return null; @@ -297,7 +297,7 @@ public class DictionaryManager { return trySaveNewDict(dictionary, dictInfo); } - private Dictionary<String> buildDictFromReadableTable(ReadableTable inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws IOException { + private Dictionary<String> buildDictFromReadableTable(IReadableTable inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws IOException { Dictionary<String> dictionary; IDictionaryValueEnumerator columnValueEnumerator = null; try { @@ -317,7 +317,7 @@ public class DictionaryManager { return dictionary; } - public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, Dictionary<String> dictionary) throws IOException { + public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, IReadableTable inpTable, Dictionary<String> dictionary) throws IOException { DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable); String dupInfo = checkDupByInfo(dictInfo); if (dupInfo != null) { @@ -328,7 +328,7 @@ public class DictionaryManager { return trySaveNewDict(dictionary, dictInfo); } - private DictionaryInfo createDictionaryInfo(DataModelDesc model, TblColRef col, ReadableTable inpTable) throws IOException { + private DictionaryInfo createDictionaryInfo(DataModelDesc model, TblColRef col, IReadableTable inpTable) throws IOException { TblColRef srcCol = decideSourceData(model, col); TableSignature inputSig = inpTable.getSignature(); if (inputSig == null) // table does not exists http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java index c91cbbb..242b1ff 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java @@ -19,7 +19,7 @@ package org.apache.kylin.dict; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; /** * To build dictionary, we need a list of distinct values on a column. @@ -31,5 +31,5 @@ import org.apache.kylin.source.ReadableTable; public interface DistinctColumnValuesProvider { /** Return a ReadableTable contains only one column, each row being a distinct value. */ - public ReadableTable getDistinctValuesFor(TblColRef col); + public IReadableTable getDistinctValuesFor(TblColRef col); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java index 957625d..7caf686 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java @@ -21,18 +21,18 @@ package org.apache.kylin.dict; import java.io.IOException; import java.util.Arrays; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; /** * Created by dongli on 10/29/15. */ public class TableColumnValueEnumerator implements IDictionaryValueEnumerator { - private ReadableTable.TableReader reader; + private IReadableTable.TableReader reader; private int colIndex; private String colValue; - public TableColumnValueEnumerator(ReadableTable.TableReader reader, int colIndex) { + public TableColumnValueEnumerator(IReadableTable.TableReader reader, int colIndex) { this.reader = reader; this.colIndex = colIndex; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java index c57cc5e..8f9f74f 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java @@ -24,14 +24,14 @@ import java.util.Collection; import java.util.Comparator; import java.util.PriorityQueue; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; /** * Created by xiefan46 on 11/14/16. */ public class TableColumnValueSortedEnumerator implements IDictionaryValueEnumerator { - private Collection<ReadableTable.TableReader> readers; + private Collection<IReadableTable.TableReader> readers; private int colIndex; @@ -41,7 +41,7 @@ public class TableColumnValueSortedEnumerator implements IDictionaryValueEnumera private PriorityQueue<ReaderBuffer> pq; - public TableColumnValueSortedEnumerator(Collection<ReadableTable.TableReader> readers, int colIndex, final Comparator<String> comparator) { + public TableColumnValueSortedEnumerator(Collection<IReadableTable.TableReader> readers, int colIndex, final Comparator<String> comparator) { this.readers = readers; this.colIndex = colIndex; this.comparator = comparator; @@ -59,7 +59,7 @@ public class TableColumnValueSortedEnumerator implements IDictionaryValueEnumera return comparator.compare(i.peek(), j.peek()); } }); - for (ReadableTable.TableReader reader : readers) { + for (IReadableTable.TableReader reader : readers) { if (reader != null) { try { pq.add(new ReaderBuffer(reader)); @@ -92,7 +92,7 @@ public class TableColumnValueSortedEnumerator implements IDictionaryValueEnumera @Override public void close() throws IOException { - for (ReadableTable.TableReader reader : readers) { + for (IReadableTable.TableReader reader : readers) { if (reader != null) reader.close(); } @@ -104,7 +104,7 @@ public class TableColumnValueSortedEnumerator implements IDictionaryValueEnumera } final class ReaderBuffer { - public ReaderBuffer(ReadableTable.TableReader reader) throws IOException { + public ReaderBuffer(IReadableTable.TableReader reader) throws IOException { this.reader = reader; reload(); } @@ -148,7 +148,7 @@ public class TableColumnValueSortedEnumerator implements IDictionaryValueEnumera private String cache; - private ReadableTable.TableReader reader; + private IReadableTable.TableReader reader; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java index 4b96622..886de22 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java @@ -25,7 +25,7 @@ import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; /** * @author yangli9 @@ -61,7 +61,7 @@ public class LookupStringTable extends LookupTable<String> { boolean[] colIsDateTime; boolean[] colIsNumber; - public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { + public LookupStringTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException { super(tableDesc, keyColumns, table); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java index 90ca500..a99ef29 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java @@ -31,8 +31,8 @@ import org.apache.commons.io.IOUtils; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableReader; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.IReadableTable.TableReader; import com.google.common.collect.Sets; @@ -46,10 +46,10 @@ abstract public class LookupTable<T> { protected TableDesc tableDesc; protected String[] keyColumns; - protected ReadableTable table; + protected IReadableTable table; protected Map<Array<T>, T[]> data; - public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { + public LookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException { this.tableDesc = tableDesc; this.keyColumns = keyColumns; this.table = table; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index a912696..b997a88 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -29,8 +29,8 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableSignature; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.IReadableTable.TableSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +121,7 @@ public class SnapshotManager { snapshotCache.invalidate(resourcePath); } - public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { + public SnapshotTable buildSnapshot(IReadableTable table, TableDesc tableDesc) throws IOException { SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); snapshot.updateRandomUuid(); @@ -141,7 +141,7 @@ public class SnapshotManager { return trySaveNewSnapshot(snapshot); } - public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException { + public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException { SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); snapshot.setUuid(overwriteUUID); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java index 9d38dba..1d7e474 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java @@ -38,7 +38,7 @@ import org.apache.kylin.dict.TrieDictionary; import org.apache.kylin.dict.TrieDictionaryBuilder; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; @@ -49,7 +49,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; */ @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class SnapshotTable extends RootPersistentEntity implements ReadableTable { +public class SnapshotTable extends RootPersistentEntity implements IReadableTable { @JsonProperty("tableName") private String tableName; @@ -65,13 +65,13 @@ public class SnapshotTable extends RootPersistentEntity implements ReadableTable public SnapshotTable() { } - SnapshotTable(ReadableTable table, String tableName) throws IOException { + SnapshotTable(IReadableTable table, String tableName) throws IOException { this.tableName = tableName; this.signature = table.getSignature(); this.useDictionary = true; } - public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { + public void takeSnapshot(IReadableTable table, TableDesc tableDesc) throws IOException { this.signature = table.getSignature(); int maxIndex = tableDesc.getMaxColumnIndex(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java b/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java index 4ee279f..53c9476 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java @@ -22,11 +22,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; -public class MockupReadableTable implements ReadableTable { +public class MockupReadableTable implements IReadableTable { - public static ReadableTable newSingleColumnTable(String path, String... values) { + public static IReadableTable newSingleColumnTable(String path, String... values) { TableSignature sig = new TableSignature(path, values.length, 0); List<String[]> content = new ArrayList<>(); for (String v : values) { @@ -35,7 +35,7 @@ public class MockupReadableTable implements ReadableTable { return new MockupReadableTable(content, sig, true); } - public static ReadableTable newNonExistTable(String path) { + public static IReadableTable newNonExistTable(String path) { TableSignature sig = new TableSignature(path, -1, 0); return new MockupReadableTable(null, sig, false); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java b/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java new file mode 100644 index 0000000..a0d0642 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A table that can be read. + */ +public interface IReadableTable { + + /** + * Returns a reader to read the table. + */ + public TableReader getReader() throws IOException; + + /** + * Used to detect table modifications. + */ + public TableSignature getSignature() throws IOException; + + public boolean exists() throws IOException; + + public interface TableReader extends Closeable { + + /** + * Move to the next row, return false if no more record. + */ + public boolean next() throws IOException; + + /** + * Get the current row. + */ + public String[] getRow(); + + } + + // ============================================================================ + + @SuppressWarnings("serial") + @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) + public class TableSignature implements Serializable { + + @JsonProperty("path") + private String path; + @JsonProperty("size") + private long size; + @JsonProperty("last_modified_time") + private long lastModifiedTime; + + // for JSON serialization + public TableSignature() { + } + + public TableSignature(String path, long size, long lastModifiedTime) { + super(); + this.path = path; + this.size = size; + this.lastModifiedTime = lastModifiedTime; + } + + public void setPath(String path) { + this.path = path; + } + + public void setSize(long size) { + this.size = size; + } + + public void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } + + public String getPath() { + return path; + } + + public long getSize() { + return size; + } + + public long getLastModifiedTime() { + return lastModifiedTime; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32)); + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + (int) (size ^ (size >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TableSignature other = (TableSignature) obj; + if (lastModifiedTime != other.lastModifiedTime) + return false; + if (path == null) { + if (other.path != null) + return false; + } else if (!path.equals(other.path)) + return false; + if (size != other.size) + return false; + return true; + } + + @Override + public String toString() { + return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/ISource.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index 5bff8a7..302c53c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -18,18 +18,33 @@ package org.apache.kylin.source; -import java.util.List; - import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; +/** + * Represents a kind of source to Kylin, like Hive. + */ public interface ISource { - <I> I adaptToBuildEngine(Class<I> engineInterface); - - ReadableTable createReadableTable(TableDesc tableDesc); + /** + * Return an explorer to sync table metadata from the data source. + */ + ISourceMetadataExplorer getSourceMetadataExplorer(); - List<String> getMRDependentResources(TableDesc table); + /** + * Return an adaptor that implements specified interface as requested by the build engine. + * The IMRInput in particular, is required by the MR build engine. + */ + <I> I adaptToBuildEngine(Class<I> engineInterface); - SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition); + /** + * Return a ReadableTable that can iterate through the rows of given table. + */ + IReadableTable createReadableTable(TableDesc tableDesc); + + /** + * Give the source a chance to enrich a SourcePartition before build start. + * Particularly, Kafka source use this chance to define start/end offsets within each partition. + */ + SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java b/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java new file mode 100644 index 0000000..b746f19 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source; + +import java.util.List; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; + +public interface ISourceMetadataExplorer { + + List<String> listDatabases() throws Exception; + + List<String> listTables(String database) throws Exception; + + Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table) throws Exception; + + List<String> getRelatedKylinResources(TableDesc table); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java b/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java deleted file mode 100644 index c62f1fe..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source; - -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - */ -public interface ReadableTable { - - /** - * Returns a reader to read the table. - */ - public TableReader getReader() throws IOException; - - /** - * Used to detect table modifications. - */ - public TableSignature getSignature() throws IOException; - - public boolean exists() throws IOException; - - - public interface TableReader extends Closeable { - - /** - * Move to the next row, return false if no more record. - */ - public boolean next() throws IOException; - - /** - * Get the current row. - */ - public String[] getRow(); - - } - - // ============================================================================ - - @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) - public class TableSignature implements Serializable{ - - @JsonProperty("path") - private String path; - @JsonProperty("size") - private long size; - @JsonProperty("last_modified_time") - private long lastModifiedTime; - - // for JSON serialization - public TableSignature() { - } - - public TableSignature(String path, long size, long lastModifiedTime) { - super(); - this.path = path; - this.size = size; - this.lastModifiedTime = lastModifiedTime; - } - - public void setPath(String path) { - this.path = path; - } - - public void setSize(long size) { - this.size = size; - } - - public void setLastModifiedTime(long lastModifiedTime) { - this.lastModifiedTime = lastModifiedTime; - } - - public String getPath() { - return path; - } - - public long getSize() { - return size; - } - - public long getLastModifiedTime() { - return lastModifiedTime; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32)); - result = prime * result + ((path == null) ? 0 : path.hashCode()); - result = prime * result + (int) (size ^ (size >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TableSignature other = (TableSignature) obj; - if (lastModifiedTime != other.lastModifiedTime) - return false; - if (path == null) { - if (other.path != null) - return false; - } else if (!path.equals(other.path)) - return false; - if (size != other.size) - return false; - return true; - } - - @Override - public String toString() { - return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]"; - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java index 5ce9014..86f89b8 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java @@ -33,21 +33,26 @@ public class SourceFactory { Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSourceEngines(); sources = new ImplementationSwitch<>(impls, ISource.class); } + + public static ISource getDefaultSource() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + return sources.get(config.getDefaultSource()); + } - public static ISource tableSource(ISourceAware aware) { + public static ISource getSource(ISourceAware aware) { return sources.get(aware.getSourceType()); } - public static ReadableTable createReadableTable(TableDesc table) { - return tableSource(table).createReadableTable(table); + public static IReadableTable createReadableTable(TableDesc table) { + return getSource(table).createReadableTable(table); } public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) { - return tableSource(table).adaptToBuildEngine(engineInterface); + return getSource(table).adaptToBuildEngine(engineInterface); } public static List<String> getMRDependentResources(TableDesc table) { - return tableSource(table).getMRDependentResources(table); + return getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java index e489704..43e46c6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java @@ -24,6 +24,14 @@ import java.util.Map; import com.google.common.base.Objects; /** + * Defines a set of source records that will be built into a cube segment. + * + * There are two main approaches: + * 1) by a date range, in case of time partitioned tables like Hive. + * 2) by an offset range, in case of offset based source like Kafka. + * + * For the offset approach, the source can further be partitioned and each partition can define + * its own start and end offset within that partition. */ public class SourcePartition { long startDate; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 7df80ca..73fc380 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -42,7 +42,7 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,7 +288,7 @@ public class CubeTupleConverter implements ITupleConverter { private static class EnhancedStringLookupTable extends LookupStringTable { - public EnhancedStringLookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { + public EnhancedStringLookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException { super(tableDesc, keyColumns, table); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java index 19bc021..c036445 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java @@ -28,11 +28,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; /** */ -public class DFSFileTable implements ReadableTable { +public class DFSFileTable implements IReadableTable { public static final String DELIM_AUTO = "auto"; public static final String DELIM_COMMA = ","; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index 8c6b5f5..0c9c3fc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -42,7 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringSplitter; -import org.apache.kylin.source.ReadableTable.TableReader; +import org.apache.kylin.source.IReadableTable.TableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index 8e77513..aca9853 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -18,16 +18,16 @@ package org.apache.kylin.engine.mr; +import java.util.Collection; + import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.TableDesc; -import java.util.Collection; - /** - * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface. + * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface. */ public interface IMRInput { http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java index f396b5a..bcf4b98 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java @@ -28,7 +28,7 @@ import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.dict.ByteComparator; import org.apache.kylin.dict.StringBytesConverter; import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; * * You need to ensure that values inside each file is sorted */ -public class SortedColumnDFSFile implements ReadableTable { +public class SortedColumnDFSFile implements IReadableTable { private static final Logger logger = LoggerFactory.getLogger(SortedColumnDFSFile.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java index 77719ff..bb00442 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java @@ -17,7 +17,7 @@ */ package org.apache.kylin.engine.mr; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import java.io.IOException; import java.util.Collection; @@ -27,8 +27,8 @@ import java.util.PriorityQueue; /** * Created by xiefan on 16-11-22. */ -public class SortedColumnDFSFileReader implements ReadableTable.TableReader { - private Collection<ReadableTable.TableReader> readers; +public class SortedColumnDFSFileReader implements IReadableTable.TableReader { + private Collection<IReadableTable.TableReader> readers; @SuppressWarnings("unused") private Comparator<String> comparator; @@ -37,7 +37,7 @@ public class SortedColumnDFSFileReader implements ReadableTable.TableReader { private String[] row; - public SortedColumnDFSFileReader(Collection<ReadableTable.TableReader> readers, final Comparator<String> comparator) { + public SortedColumnDFSFileReader(Collection<IReadableTable.TableReader> readers, final Comparator<String> comparator) { this.readers = readers; this.comparator = comparator; pq = new PriorityQueue<ReaderBuffer>(11, new Comparator<ReaderBuffer>() { @@ -54,7 +54,7 @@ public class SortedColumnDFSFileReader implements ReadableTable.TableReader { return comparator.compare(i.peek()[0], j.peek()[0]); } }); - for (ReadableTable.TableReader reader : readers) { + for (IReadableTable.TableReader reader : readers) { if (reader != null) { try { pq.add(new ReaderBuffer(reader)); @@ -91,16 +91,16 @@ public class SortedColumnDFSFileReader implements ReadableTable.TableReader { @Override public void close() throws IOException { - for (ReadableTable.TableReader reader : readers) + for (IReadableTable.TableReader reader : readers) reader.close(); } static class ReaderBuffer { - private ReadableTable.TableReader reader; + private IReadableTable.TableReader reader; private String[] row; - public ReaderBuffer(ReadableTable.TableReader reader) throws IOException { + public ReaderBuffer(IReadableTable.TableReader reader) throws IOException { this.reader = reader; reload(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index be8c305..98ebbb4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -41,7 +41,7 @@ import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.engine.mr.SortedColumnDFSFile; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +65,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob { DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { @Override - public ReadableTable getDistinctValuesFor(TblColRef col) { + public IReadableTable getDistinctValuesFor(TblColRef col) { return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType()); } }, new DictionaryProvider() { http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index dc593b0..04af4fe 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -43,7 +43,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.source.ReadableTable.TableSignature; +import org.apache.kylin.source.IReadableTable.TableSignature; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java index 08a8cb0..41bdd28 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java @@ -290,7 +290,7 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.metadata.project.RealizationEntry.class); kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class); kyroClasses.add(org.apache.kylin.metadata.streaming.StreamingConfig.class); - kyroClasses.add(org.apache.kylin.source.ReadableTable.TableSignature.class); + kyroClasses.add(org.apache.kylin.source.IReadableTable.TableSignature.class); kyroClasses.add(org.apache.kylin.storage.hybrid.HybridInstance.class); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java index 458703a..4dcfdb2 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java @@ -36,7 +36,7 @@ import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.engine.mr.DFSFileTable; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -119,7 +119,7 @@ public class ITDictionaryManagerTest extends LocalFileMetadataTestCase { } @Override - public ReadableTable getDistinctValuesFor(TblColRef col) { + public IReadableTable getDistinctValuesFor(TblColRef col) { return new DFSFileTable(tmpFilePath, -1); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 58078eb..5719523 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -339,8 +339,8 @@ public class BuildCubeWithEngine { private Boolean buildSegment(String cubeName, long startDate, long endDate) throws Exception { CubeInstance cubeInstance = cubeManager.getCube(cubeName); - ISource source = SourceFactory.tableSource(cubeInstance); - SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 0, 0, null, null)); + ISource source = SourceFactory.getSource(cubeInstance); + SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 0, 0, null, null)); CubeSegment segment = cubeManager.appendSegment(cubeInstance, partition.getStartDate(), partition.getEndDate()); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 5310816..9c80413 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -281,8 +281,8 @@ public class BuildCubeWithStream { protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeInstance cubeInstance = cubeManager.getCube(cubeName); - ISource source = SourceFactory.tableSource(cubeInstance); - SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, startOffset, endOffset, null, null)); + ISource source = SourceFactory.getSource(cubeInstance); + SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, startOffset, endOffset, null, null)); CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java index 7aff3ba..d972eeb 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java @@ -20,11 +20,13 @@ package org.apache.kylin.source.hive; import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.util.Set; - -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.ISourceMetadataExplorer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,14 +44,17 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase { } @Test - public void test() throws IOException { - KylinConfig config = getTestConfig(); - String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" }; - Set<String> loaded = HiveSourceTableLoader.loadHiveTables(toLoad, config); - - assertTrue(loaded.size() == toLoad.length); - for (String str : toLoad) - assertTrue(loaded.contains(str)); + public void test() throws Exception { + ISource source = SourceFactory.getDefaultSource(); + ISourceMetadataExplorer explr = source.getSourceMetadataExplorer(); + Pair<TableDesc, TableExtDesc> pair; + + pair = explr.loadTableMetadata("DEFAULT", "TEST_KYLIN_FACT"); + assertTrue(pair.getFirst().getIdentity().equals("DEFAULT.TEST_KYLIN_FACT")); + + pair = explr.loadTableMetadata("EDW", "TEST_CAL_DT"); + assertTrue(pair.getFirst().getIdentity().equals("EDW.TEST_CAL_DT")); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java index 6396749..c56c9bb 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java @@ -26,8 +26,8 @@ import org.apache.kylin.dict.lookup.SnapshotManager; import org.apache.kylin.dict.lookup.SnapshotTable; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableReader; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.IReadableTable.TableReader; import org.apache.kylin.source.SourceFactory; import org.junit.After; import org.junit.Before; @@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase { public void basicTest() throws Exception { String tableName = "EDW.TEST_SITES"; TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName); - ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath(); snapshotMgr.wipeoutCache(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/pom.xml ---------------------------------------------------------------------- diff --git a/server-base/pom.xml b/server-base/pom.xml index 3db88be..b165b99 100644 --- a/server-base/pom.xml +++ b/server-base/pom.xml @@ -47,6 +47,8 @@ </exclusion> </exclusions> </dependency> + + <!-- these plug-in modules, should not have API dependencies --> <dependency> <groupId>org.apache.kylin</groupId> <artifactId>kylin-storage-hbase</artifactId> http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 7d4cbe6..5b16995 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -307,7 +307,7 @@ public class CubeController extends BasicController { sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, submitter); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); - throw new InternalErrorException(e.getLocalizedMessage()); + throw new InternalErrorException(e.getLocalizedMessage(), e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index 9785db2..ebbfeb2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -152,7 +152,7 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/{tableNames}/cardinality", method = { RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody - public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) throws IOException { + public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) throws Exception { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); String[] tables = tableNames.split(","); try { http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java b/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java index 4a4bf74..bfc5555 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java @@ -18,6 +18,8 @@ package org.apache.kylin.rest.controller2; +import java.io.IOException; + import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.rest.controller.BasicController; import org.apache.kylin.rest.exception.BadRequestException; @@ -41,8 +43,6 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; -import java.io.IOException; - /** * @author xduo */ @@ -92,7 +92,7 @@ public class TableControllerV2 extends BasicController { @RequestMapping(value = "/load", method = { RequestMethod.POST }, produces = { "application/vnd.apache.kylin-v2+json" }) @ResponseBody - public EnvelopeResponse loadHiveTablesV2(@RequestHeader("Accept-Language") String lang, @RequestBody HiveTableRequestV2 requestV2) throws IOException { + public EnvelopeResponse loadHiveTablesV2(@RequestHeader("Accept-Language") String lang, @RequestBody HiveTableRequestV2 requestV2) throws Exception { MsgPicker.setMsg(lang); return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, tableServiceV2.loadHiveTables(requestV2.getTables(), requestV2.getProject(), requestV2.isNeedProfile()), ""); @@ -115,7 +115,7 @@ public class TableControllerV2 extends BasicController { @RequestMapping(value = "/cardinality", method = { RequestMethod.POST }, produces = { "application/vnd.apache.kylin-v2+json" }) @ResponseBody - public void generateCardinalityV2(@RequestHeader("Accept-Language") String lang, @RequestBody HiveTableRequestV2 requestV2) throws IOException { + public void generateCardinalityV2(@RequestHeader("Accept-Language") String lang, @RequestBody HiveTableRequestV2 requestV2) throws Exception { MsgPicker.setMsg(lang); String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java index ae35de2..d92107b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java @@ -57,8 +57,8 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.source.hive.HiveClientFactory; -import org.apache.kylin.source.hive.IHiveClient; +import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.ISourceMetadataExplorer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,8 +253,8 @@ public class StorageCleanupJob extends AbstractApplication { final String preFix = "kylin_intermediate_"; final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - List<String> hiveTableNames = hiveClient.getHiveTableNames(config.getHiveDatabaseForIntermediateTable()); + ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + List<String> hiveTableNames = explr.listTables(config.getHiveDatabaseForIntermediateTable()); Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() { @Override public boolean apply(@Nullable String input) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 4e5f80f..fd9112b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -211,9 +211,9 @@ public class JobService extends BasicService implements InitializingBean { CubeSegment newSeg = null; try { if (buildType == CubeBuildTypeEnum.BUILD) { - ISource source = SourceFactory.tableSource(cube); + ISource source = SourceFactory.getSource(cube); SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); - sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition); + sourcePartition = source.enrichSourcePartitionBeforeBuild(cube, sourcePartition); newSeg = getCubeManager().appendSegment(cube, sourcePartition); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) {