KYLIN-2640 refactor ISource API

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a0edfef
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a0edfef
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a0edfef

Branch: refs/heads/KYLIN-2606
Commit: 0a0edfefb96689ba06bb89a657d7c65711894ebb
Parents: 4766c78
Author: Li Yang <liy...@apache.org>
Authored: Tue May 23 14:55:09 2017 +0800
Committer: hongbin ma <m...@kyligence.io>
Committed: Tue May 23 17:30:18 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../java/org/apache/kylin/cube/CubeManager.java |  24 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |  14 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |   8 +-
 .../org/apache/kylin/cube/util/CubingUtils.java |   4 +-
 .../org/apache/kylin/dict/DictionaryInfo.java   |   2 +-
 .../apache/kylin/dict/DictionaryManager.java    |  14 +-
 .../dict/DistinctColumnValuesProvider.java      |   4 +-
 .../kylin/dict/TableColumnValueEnumerator.java  |   6 +-
 .../dict/TableColumnValueSortedEnumerator.java  |  14 +-
 .../kylin/dict/lookup/LookupStringTable.java    |   4 +-
 .../apache/kylin/dict/lookup/LookupTable.java   |   8 +-
 .../kylin/dict/lookup/SnapshotManager.java      |   8 +-
 .../apache/kylin/dict/lookup/SnapshotTable.java |   8 +-
 .../apache/kylin/dict/MockupReadableTable.java  |   8 +-
 .../org/apache/kylin/source/IReadableTable.java | 146 ++++++++++++
 .../java/org/apache/kylin/source/ISource.java   |  29 ++-
 .../kylin/source/ISourceMetadataExplorer.java   |  36 +++
 .../org/apache/kylin/source/ReadableTable.java  | 145 ------------
 .../org/apache/kylin/source/SourceFactory.java  |  15 +-
 .../apache/kylin/source/SourcePartition.java    |   8 +
 .../storage/gtrecord/CubeTupleConverter.java    |   4 +-
 .../apache/kylin/engine/mr/DFSFileTable.java    |   4 +-
 .../kylin/engine/mr/DFSFileTableReader.java     |   2 +-
 .../org/apache/kylin/engine/mr/IMRInput.java    |   6 +-
 .../kylin/engine/mr/SortedColumnDFSFile.java    |   4 +-
 .../engine/mr/SortedColumnDFSFileReader.java    |  16 +-
 .../engine/mr/steps/CreateDictionaryJob.java    |   4 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  |   2 +-
 .../engine/spark/KylinKryoRegistrator.java      |   2 +-
 .../kylin/cube/ITDictionaryManagerTest.java     |   4 +-
 .../kylin/provision/BuildCubeWithEngine.java    |   4 +-
 .../kylin/provision/BuildCubeWithStream.java    |   4 +-
 .../hive/ITHiveSourceTableLoaderTest.java       |  29 ++-
 .../source/hive/ITSnapshotManagerTest.java      |   6 +-
 server-base/pom.xml                             |   2 +
 .../kylin/rest/controller/CubeController.java   |   2 +-
 .../kylin/rest/controller/TableController.java  |   2 +-
 .../rest/controller2/TableControllerV2.java     |   8 +-
 .../kylin/rest/job/StorageCleanupJob.java       |   8 +-
 .../apache/kylin/rest/service/JobService.java   |   4 +-
 .../rest/service/TableSchemaUpdateChecker.java  | 207 ++++++++++++++++
 .../apache/kylin/rest/service/TableService.java | 105 +++++++--
 .../kylin/rest/service/TableServiceV2.java      |  12 +-
 .../kylin/source/hive/HiveMetadataExplorer.java | 116 +++++++++
 .../apache/kylin/source/hive/HiveSource.java    |  38 +--
 .../source/hive/HiveSourceTableLoader.java      | 147 ------------
 .../org/apache/kylin/source/hive/HiveTable.java |   4 +-
 .../kylin/source/hive/HiveTableReader.java      |   2 +-
 .../apache/kylin/source/hive/SchemaChecker.java | 234 -------------------
 .../apache/kylin/source/kafka/KafkaSource.java  |  46 +++-
 51 files changed, 816 insertions(+), 721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8930129..abe142a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -474,6 +474,10 @@ abstract public class KylinConfigBase implements 
Serializable {
     // SOURCE.HIVE
     // 
============================================================================
 
+    public int getDefaultSource() {
+        return Integer.parseInt(getOptional("kylin.source.default", "0"));
+    }
+
     public Map<Integer, String> getSourceEngines() {
         Map<Integer, String> r = Maps.newLinkedHashMap();
         // ref constants in ISourceAware

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 0a94fb2..e6cd761 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -66,7 +66,7 @@ import 
org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
@@ -214,7 +214,7 @@ public class CubeManager implements IRealizationProvider {
         return result;
     }
 
-    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, 
ReadableTable inpTable) throws IOException {
+    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, 
IReadableTable inpTable) throws IOException {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
         if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
             return null;
@@ -226,7 +226,7 @@ public class CubeManager implements IRealizationProvider {
         return dictInfo;
     }
 
-    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, 
ReadableTable inpTable, Dictionary<String> dict) throws IOException {
+    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, 
IReadableTable inpTable, Dictionary<String> dict) throws IOException {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
         if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
             return null;
@@ -281,7 +281,7 @@ public class CubeManager implements IRealizationProvider {
             tableDesc.setName(tableName);
         }
 
-        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+        IReadableTable hiveTable = 
SourceFactory.createReadableTable(tableDesc);
         SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc);
 
         cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
@@ -459,8 +459,22 @@ public class CubeManager implements IRealizationProvider {
         return appendSegment(cube, sourcePartition.getStartDate(), 
sourcePartition.getEndDate(), sourcePartition.getStartOffset(), 
sourcePartition.getEndOffset(), 
sourcePartition.getSourcePartitionOffsetStart(), 
sourcePartition.getSourcePartitionOffsetEnd());
     }
 
-    public CubeSegment appendSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset, Map<Integer, Long> 
sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws 
IOException {
+    CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, 
long startOffset, long endOffset, Map<Integer, Long> 
sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws 
IOException {
         checkBuildingSegment(cube);
+        
+        // fix start/end a bit
+        if (cube.getModel().getPartitionDesc().isPartitioned()) {
+            // if missing start, set it to where last time ends
+            if (startDate == 0 && startOffset == 0 && cube.getLastSegment() != 
null) {
+                CubeSegment last = cube.getLastSegment();
+                startDate = last.isSourceOffsetsOn() ? 0 : 
last.getDateRangeEnd();
+                startOffset = last.isSourceOffsetsOn() ? 
last.getSourceOffsetEnd() : 0;
+            }
+        } else {
+            // full build
+            startDate = startOffset = endOffset = 0;
+            endDate = Long.MAX_VALUE;
+        }
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate, 
startOffset, endOffset);
         newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 45310f0..1b28bd8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -126,11 +126,13 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
      * returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
      */
     public static String makeSegmentName(long startDate, long endDate, long 
startOffset, long endOffset) {
-        if (startOffset != 0 || endOffset != 0) {
-            if (startOffset == 0 && (endOffset == 0 || endOffset == 
Long.MAX_VALUE)) {
-                return "FULL_BUILD";
-            }
+        if (startOffset == 0 && startDate == 0 //
+                && (endOffset == 0 || endOffset == Long.MAX_VALUE) //
+                && (endDate == 0 || endDate == Long.MAX_VALUE)) {
+            return "FULL_BUILD";
+        }
 
+        if (startOffset != 0 || endOffset != 0) {
             return startOffset + "_" + endOffset;
         }
 
@@ -296,13 +298,13 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
         String r;
         String dictKey = col.getIdentity();
         r = getDictionaries().get(dictKey);
-        
+
         // try Kylin v1.x dict key as well
         if (r == null) {
             String v1DictKey = col.getTable() + "/" + col.getName();
             r = getDictionaries().get(v1DictKey);
         }
-        
+
         return r;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index e8c53f7..e7368e8 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -36,7 +36,7 @@ import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +60,7 @@ public class DictionaryGeneratorCLI {
         // dictionary
         for (TblColRef col : 
cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
             logger.info("Building dictionary for " + col);
-            ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, 
factTableValueProvider);
+            IReadableTable inpTable = decideInputTable(cubeSeg.getModel(), 
col, factTableValueProvider);
             if (dictProvider != null) {
                 Dictionary<String> dict = dictProvider.getDictionary(col);
                 if (dict != null) {
@@ -99,13 +99,13 @@ public class DictionaryGeneratorCLI {
         }
     }
 
-    private static ReadableTable decideInputTable(DataModelDesc model, 
TblColRef col, DistinctColumnValuesProvider factTableValueProvider) {
+    private static IReadableTable decideInputTable(DataModelDesc model, 
TblColRef col, DistinctColumnValuesProvider factTableValueProvider) {
         KylinConfig config = model.getConfig();
         DictionaryManager dictMgr = DictionaryManager.getInstance(config);
         TblColRef srcCol = dictMgr.decideSourceData(model, col);
         String srcTable = srcCol.getTable();
 
-        ReadableTable inpTable;
+        IReadableTable inpTable;
         if (model.isFactTable(srcTable)) {
             inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java 
b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 5e63f94..b1b6bce 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -41,7 +41,7 @@ import 
org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,7 +166,7 @@ public class CubingUtils {
         for (Map.Entry<TblColRef, Dictionary<String>> entry : 
dictionaryMap.entrySet()) {
             final TblColRef tblColRef = entry.getKey();
             final Dictionary<String> dictionary = entry.getValue();
-            ReadableTable.TableSignature signature = new 
ReadableTable.TableSignature();
+            IReadableTable.TableSignature signature = new 
IReadableTable.TableSignature();
             signature.setLastModifiedTime(System.currentTimeMillis());
             signature.setPath(String.format("streaming_%s_%s", startOffset, 
endOffset));
             signature.setSize(endOffset - startOffset);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index a85628d..ae5c0f1 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -22,7 +22,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 4c4e334..bbe6cac 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -41,8 +41,8 @@ import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -273,11 +273,11 @@ public class DictionaryManager {
         }
     }
 
-    public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, 
ReadableTable inpTable) throws IOException {
+    public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, 
IReadableTable inpTable) throws IOException {
         return buildDictionary(model, col, inpTable, null);
     }
 
-    public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, 
ReadableTable inpTable, String builderClass) throws IOException {
+    public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, 
IReadableTable inpTable, String builderClass) throws IOException {
         if (inpTable.exists() == false)
             return null;
 
@@ -297,7 +297,7 @@ public class DictionaryManager {
         return trySaveNewDict(dictionary, dictInfo);
     }
 
-    private Dictionary<String> buildDictFromReadableTable(ReadableTable 
inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws 
IOException {
+    private Dictionary<String> buildDictFromReadableTable(IReadableTable 
inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws 
IOException {
         Dictionary<String> dictionary;
         IDictionaryValueEnumerator columnValueEnumerator = null;
         try {
@@ -317,7 +317,7 @@ public class DictionaryManager {
         return dictionary;
     }
 
-    public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, 
ReadableTable inpTable, Dictionary<String> dictionary) throws IOException {
+    public DictionaryInfo saveDictionary(DataModelDesc model, TblColRef col, 
IReadableTable inpTable, Dictionary<String> dictionary) throws IOException {
         DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable);
         String dupInfo = checkDupByInfo(dictInfo);
         if (dupInfo != null) {
@@ -328,7 +328,7 @@ public class DictionaryManager {
         return trySaveNewDict(dictionary, dictInfo);
     }
 
-    private DictionaryInfo createDictionaryInfo(DataModelDesc model, TblColRef 
col, ReadableTable inpTable) throws IOException {
+    private DictionaryInfo createDictionaryInfo(DataModelDesc model, TblColRef 
col, IReadableTable inpTable) throws IOException {
         TblColRef srcCol = decideSourceData(model, col);
         TableSignature inputSig = inpTable.getSignature();
         if (inputSig == null) // table does not exists

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
index c91cbbb..242b1ff 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.dict;
 
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 /**
  * To build dictionary, we need a list of distinct values on a column.
@@ -31,5 +31,5 @@ import org.apache.kylin.source.ReadableTable;
 public interface DistinctColumnValuesProvider {
 
     /** Return a ReadableTable contains only one column, each row being a 
distinct value. */
-    public ReadableTable getDistinctValuesFor(TblColRef col);
+    public IReadableTable getDistinctValuesFor(TblColRef col);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
index 957625d..7caf686 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
@@ -21,18 +21,18 @@ package org.apache.kylin.dict;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 /**
  * Created by dongli on 10/29/15.
  */
 public class TableColumnValueEnumerator implements IDictionaryValueEnumerator {
 
-    private ReadableTable.TableReader reader;
+    private IReadableTable.TableReader reader;
     private int colIndex;
     private String colValue;
 
-    public TableColumnValueEnumerator(ReadableTable.TableReader reader, int 
colIndex) {
+    public TableColumnValueEnumerator(IReadableTable.TableReader reader, int 
colIndex) {
         this.reader = reader;
         this.colIndex = colIndex;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java
index c57cc5e..8f9f74f 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueSortedEnumerator.java
@@ -24,14 +24,14 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.PriorityQueue;
 
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 /**
  * Created by xiefan46 on 11/14/16.
  */
 public class TableColumnValueSortedEnumerator implements 
IDictionaryValueEnumerator {
 
-    private Collection<ReadableTable.TableReader> readers;
+    private Collection<IReadableTable.TableReader> readers;
 
     private int colIndex;
 
@@ -41,7 +41,7 @@ public class TableColumnValueSortedEnumerator implements 
IDictionaryValueEnumera
 
     private PriorityQueue<ReaderBuffer> pq;
 
-    public 
TableColumnValueSortedEnumerator(Collection<ReadableTable.TableReader> readers, 
int colIndex, final Comparator<String> comparator) {
+    public 
TableColumnValueSortedEnumerator(Collection<IReadableTable.TableReader> 
readers, int colIndex, final Comparator<String> comparator) {
         this.readers = readers;
         this.colIndex = colIndex;
         this.comparator = comparator;
@@ -59,7 +59,7 @@ public class TableColumnValueSortedEnumerator implements 
IDictionaryValueEnumera
                 return comparator.compare(i.peek(), j.peek());
             }
         });
-        for (ReadableTable.TableReader reader : readers) {
+        for (IReadableTable.TableReader reader : readers) {
             if (reader != null) {
                 try {
                     pq.add(new ReaderBuffer(reader));
@@ -92,7 +92,7 @@ public class TableColumnValueSortedEnumerator implements 
IDictionaryValueEnumera
 
     @Override
     public void close() throws IOException {
-        for (ReadableTable.TableReader reader : readers) {
+        for (IReadableTable.TableReader reader : readers) {
             if (reader != null)
                 reader.close();
         }
@@ -104,7 +104,7 @@ public class TableColumnValueSortedEnumerator implements 
IDictionaryValueEnumera
     }
 
     final class ReaderBuffer {
-        public ReaderBuffer(ReadableTable.TableReader reader) throws 
IOException {
+        public ReaderBuffer(IReadableTable.TableReader reader) throws 
IOException {
             this.reader = reader;
             reload();
         }
@@ -148,7 +148,7 @@ public class TableColumnValueSortedEnumerator implements 
IDictionaryValueEnumera
 
         private String cache;
 
-        private ReadableTable.TableReader reader;
+        private IReadableTable.TableReader reader;
 
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 4b96622..886de22 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -25,7 +25,7 @@ import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 /**
  * @author yangli9
@@ -61,7 +61,7 @@ public class LookupStringTable extends LookupTable<String> {
     boolean[] colIsDateTime;
     boolean[] colIsNumber;
 
-    public LookupStringTable(TableDesc tableDesc, String[] keyColumns, 
ReadableTable table) throws IOException {
+    public LookupStringTable(TableDesc tableDesc, String[] keyColumns, 
IReadableTable table) throws IOException {
         super(tableDesc, keyColumns, table);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 90ca500..a99ef29 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -31,8 +31,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableReader;
 
 import com.google.common.collect.Sets;
 
@@ -46,10 +46,10 @@ abstract public class LookupTable<T> {
 
     protected TableDesc tableDesc;
     protected String[] keyColumns;
-    protected ReadableTable table;
+    protected IReadableTable table;
     protected Map<Array<T>, T[]> data;
 
-    public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable 
table) throws IOException {
+    public LookupTable(TableDesc tableDesc, String[] keyColumns, 
IReadableTable table) throws IOException {
         this.tableDesc = tableDesc;
         this.keyColumns = keyColumns;
         this.table = table;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index a912696..b997a88 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -29,8 +29,8 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,7 +121,7 @@ public class SnapshotManager {
         snapshotCache.invalidate(resourcePath);
     }
 
-    public SnapshotTable buildSnapshot(ReadableTable table, TableDesc 
tableDesc) throws IOException {
+    public SnapshotTable buildSnapshot(IReadableTable table, TableDesc 
tableDesc) throws IOException {
         SnapshotTable snapshot = new SnapshotTable(table, 
tableDesc.getIdentity());
         snapshot.updateRandomUuid();
 
@@ -141,7 +141,7 @@ public class SnapshotManager {
         return trySaveNewSnapshot(snapshot);
     }
 
-    public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc 
tableDesc, String overwriteUUID) throws IOException {
+    public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc 
tableDesc, String overwriteUUID) throws IOException {
         SnapshotTable snapshot = new SnapshotTable(table, 
tableDesc.getIdentity());
         snapshot.setUuid(overwriteUUID);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 9d38dba..1d7e474 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -38,7 +38,7 @@ import org.apache.kylin.dict.TrieDictionary;
 import org.apache.kylin.dict.TrieDictionaryBuilder;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -49,7 +49,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  */
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
-public class SnapshotTable extends RootPersistentEntity implements 
ReadableTable {
+public class SnapshotTable extends RootPersistentEntity implements 
IReadableTable {
 
     @JsonProperty("tableName")
     private String tableName;
@@ -65,13 +65,13 @@ public class SnapshotTable extends RootPersistentEntity 
implements ReadableTable
     public SnapshotTable() {
     }
 
-    SnapshotTable(ReadableTable table, String tableName) throws IOException {
+    SnapshotTable(IReadableTable table, String tableName) throws IOException {
         this.tableName = tableName;
         this.signature = table.getSignature();
         this.useDictionary = true;
     }
 
-    public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws 
IOException {
+    public void takeSnapshot(IReadableTable table, TableDesc tableDesc) throws 
IOException {
         this.signature = table.getSignature();
 
         int maxIndex = tableDesc.getMaxColumnIndex();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java 
b/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java
index 4ee279f..53c9476 100644
--- 
a/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/MockupReadableTable.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
-public class MockupReadableTable implements ReadableTable {
+public class MockupReadableTable implements IReadableTable {
 
-    public static ReadableTable newSingleColumnTable(String path, String... 
values) {
+    public static IReadableTable newSingleColumnTable(String path, String... 
values) {
         TableSignature sig = new TableSignature(path, values.length, 0);
         List<String[]> content = new ArrayList<>();
         for (String v : values) {
@@ -35,7 +35,7 @@ public class MockupReadableTable implements ReadableTable {
         return new MockupReadableTable(content, sig, true);
     }
     
-    public static ReadableTable newNonExistTable(String path) {
+    public static IReadableTable newNonExistTable(String path) {
         TableSignature sig = new TableSignature(path, -1, 0);
         return new MockupReadableTable(null, sig, false);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java 
b/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java
new file mode 100644
index 0000000..a0d0642
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/IReadableTable.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A table that can be read.
+ */
+public interface IReadableTable {
+
+    /**
+     * Returns a reader to read the table.
+     */
+    public TableReader getReader() throws IOException;
+
+    /**
+     * Used to detect table modifications.
+     */
+    public TableSignature getSignature() throws IOException;
+
+    public boolean exists() throws IOException;
+
+    public interface TableReader extends Closeable {
+
+        /**
+         * Move to the next row, return false if no more record.
+         */
+        public boolean next() throws IOException;
+
+        /**
+         * Get the current row.
+         */
+        public String[] getRow();
+
+    }
+
+    // 
============================================================================
+
+    @SuppressWarnings("serial")
+    @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
+    public class TableSignature implements Serializable {
+
+        @JsonProperty("path")
+        private String path;
+        @JsonProperty("size")
+        private long size;
+        @JsonProperty("last_modified_time")
+        private long lastModifiedTime;
+
+        // for JSON serialization
+        public TableSignature() {
+        }
+
+        public TableSignature(String path, long size, long lastModifiedTime) {
+            super();
+            this.path = path;
+            this.size = size;
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
+        public void setSize(long size) {
+            this.size = size;
+        }
+
+        public void setLastModifiedTime(long lastModifiedTime) {
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public long getLastModifiedTime() {
+            return lastModifiedTime;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (lastModifiedTime ^ 
(lastModifiedTime >>> 32));
+            result = prime * result + ((path == null) ? 0 : path.hashCode());
+            result = prime * result + (int) (size ^ (size >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            TableSignature other = (TableSignature) obj;
+            if (lastModifiedTime != other.lastModifiedTime)
+                return false;
+            if (path == null) {
+                if (other.path != null)
+                    return false;
+            } else if (!path.equals(other.path))
+                return false;
+            if (size != other.size)
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "FileSignature [path=" + path + ", size=" + size + ", 
lastModifiedTime=" + lastModifiedTime + "]";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java 
b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 5bff8a7..302c53c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -18,18 +18,33 @@
 
 package org.apache.kylin.source;
 
-import java.util.List;
-
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 
+/**
+ * Represents a kind of source to Kylin, like Hive.
+ */
 public interface ISource {
 
-    <I> I adaptToBuildEngine(Class<I> engineInterface);
-
-    ReadableTable createReadableTable(TableDesc tableDesc);
+    /** 
+     * Return an explorer to sync table metadata from the data source.
+     */
+    ISourceMetadataExplorer getSourceMetadataExplorer();
 
-    List<String> getMRDependentResources(TableDesc table);
+    /**
+     * Return an adaptor that implements specified interface as requested by 
the build engine.
+     * The IMRInput in particular, is required by the MR build engine.
+     */
+    <I> I adaptToBuildEngine(Class<I> engineInterface);
 
-    SourcePartition parsePartitionBeforeBuild(IBuildable buildable, 
SourcePartition srcPartition);
+    /**
+     * Return a ReadableTable that can iterate through the rows of given table.
+     */
+    IReadableTable createReadableTable(TableDesc tableDesc);
+    
+    /**
+     * Give the source a chance to enrich a SourcePartition before build start.
+     * Particularly, Kafka source use this chance to define start/end offsets 
within each partition.
+     */
+    SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, 
SourcePartition srcPartition);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java
new file mode 100644
index 0000000..b746f19
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/ISourceMetadataExplorer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+
+public interface ISourceMetadataExplorer {
+
+    List<String> listDatabases() throws Exception;
+    
+    List<String> listTables(String database) throws Exception;
+    
+    Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String 
table) throws Exception;
+    
+    List<String> getRelatedKylinResources(TableDesc table);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java 
b/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
deleted file mode 100644
index c62f1fe..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-public interface ReadableTable {
-
-    /**
-     * Returns a reader to read the table.
-     */
-    public TableReader getReader() throws IOException;
-
-    /**
-     * Used to detect table modifications.
-     */
-    public TableSignature getSignature() throws IOException;
-    
-    public boolean exists() throws IOException;
-
-
-    public interface TableReader extends Closeable {
-
-        /**
-         * Move to the next row, return false if no more record.
-         */
-        public boolean next() throws IOException;
-
-        /**
-         * Get the current row.
-         */
-        public String[] getRow();
-
-    }
-
-    // 
============================================================================
-
-    @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
-    public class TableSignature implements Serializable{
-
-        @JsonProperty("path")
-        private String path;
-        @JsonProperty("size")
-        private long size;
-        @JsonProperty("last_modified_time")
-        private long lastModifiedTime;
-
-        // for JSON serialization
-        public TableSignature() {
-        }
-
-        public TableSignature(String path, long size, long lastModifiedTime) {
-            super();
-            this.path = path;
-            this.size = size;
-            this.lastModifiedTime = lastModifiedTime;
-        }
-
-        public void setPath(String path) {
-            this.path = path;
-        }
-
-        public void setSize(long size) {
-            this.size = size;
-        }
-
-        public void setLastModifiedTime(long lastModifiedTime) {
-            this.lastModifiedTime = lastModifiedTime;
-        }
-
-        public String getPath() {
-            return path;
-        }
-
-        public long getSize() {
-            return size;
-        }
-
-        public long getLastModifiedTime() {
-            return lastModifiedTime;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + (int) (lastModifiedTime ^ 
(lastModifiedTime >>> 32));
-            result = prime * result + ((path == null) ? 0 : path.hashCode());
-            result = prime * result + (int) (size ^ (size >>> 32));
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            TableSignature other = (TableSignature) obj;
-            if (lastModifiedTime != other.lastModifiedTime)
-                return false;
-            if (path == null) {
-                if (other.path != null)
-                    return false;
-            } else if (!path.equals(other.path))
-                return false;
-            if (size != other.size)
-                return false;
-            return true;
-        }
-
-        @Override
-        public String toString() {
-            return "FileSignature [path=" + path + ", size=" + size + ", 
lastModifiedTime=" + lastModifiedTime + "]";
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java 
b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index 5ce9014..86f89b8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -33,21 +33,26 @@ public class SourceFactory {
         Map<Integer, String> impls = 
KylinConfig.getInstanceFromEnv().getSourceEngines();
         sources = new ImplementationSwitch<>(impls, ISource.class);
     }
+    
+    public static ISource getDefaultSource() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        return sources.get(config.getDefaultSource());
+    }
 
-    public static ISource tableSource(ISourceAware aware) {
+    public static ISource getSource(ISourceAware aware) {
         return sources.get(aware.getSourceType());
     }
 
-    public static ReadableTable createReadableTable(TableDesc table) {
-        return tableSource(table).createReadableTable(table);
+    public static IReadableTable createReadableTable(TableDesc table) {
+        return getSource(table).createReadableTable(table);
     }
 
     public static <T> T createEngineAdapter(ISourceAware table, Class<T> 
engineInterface) {
-        return tableSource(table).adaptToBuildEngine(engineInterface);
+        return getSource(table).adaptToBuildEngine(engineInterface);
     }
 
     public static List<String> getMRDependentResources(TableDesc table) {
-        return tableSource(table).getMRDependentResources(table);
+        return 
getSource(table).getSourceMetadataExplorer().getRelatedKylinResources(table);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java 
b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
index e489704..43e46c6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -24,6 +24,14 @@ import java.util.Map;
 import com.google.common.base.Objects;
 
 /**
+ * Defines a set of source records that will be built into a cube segment.
+ * 
+ * There are two main approaches:
+ * 1) by a date range, in case of time partitioned tables like Hive.
+ * 2) by an offset range, in case of offset based source like Kafka.
+ * 
+ * For the offset approach, the source can further be partitioned and each 
partition can define
+ * its own start and end offset within that partition.
  */
 public class SourcePartition {
     long startDate;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 7df80ca..73fc380 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -42,7 +42,7 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -288,7 +288,7 @@ public class CubeTupleConverter implements ITupleConverter {
 
     private static class EnhancedStringLookupTable extends LookupStringTable {
 
-        public EnhancedStringLookupTable(TableDesc tableDesc, String[] 
keyColumns, ReadableTable table) throws IOException {
+        public EnhancedStringLookupTable(TableDesc tableDesc, String[] 
keyColumns, IReadableTable table) throws IOException {
             super(tableDesc, keyColumns, table);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
index 19bc021..c036445 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -28,11 +28,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 /**
  */
-public class DFSFileTable implements ReadableTable {
+public class DFSFileTable implements IReadableTable {
 
     public static final String DELIM_AUTO = "auto";
     public static final String DELIM_COMMA = ",";

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 8c6b5f5..0c9c3fc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.IReadableTable.TableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 8e77513..aca9853 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -18,16 +18,16 @@
 
 package org.apache.kylin.engine.mr;
 
+import java.util.Collection;
+
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
 
-import java.util.Collection;
-
 /**
- * Any ITableSource that wishes to serve as input of MapReduce build engine 
must adapt to this interface.
+ * Any ISource that wishes to serve as input of MapReduce build engine must 
adapt to this interface.
  */
 public interface IMRInput {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
index f396b5a..bcf4b98 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -28,7 +28,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.dict.ByteComparator;
 import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
  *
  * You need to ensure that values inside each file is sorted
  */
-public class SortedColumnDFSFile implements ReadableTable {
+public class SortedColumnDFSFile implements IReadableTable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SortedColumnDFSFile.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
index 77719ff..bb00442 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java
@@ -17,7 +17,7 @@
 */
 package org.apache.kylin.engine.mr;
 
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -27,8 +27,8 @@ import java.util.PriorityQueue;
 /**
  * Created by xiefan on 16-11-22.
  */
-public class SortedColumnDFSFileReader implements ReadableTable.TableReader {
-    private Collection<ReadableTable.TableReader> readers;
+public class SortedColumnDFSFileReader implements IReadableTable.TableReader {
+    private Collection<IReadableTable.TableReader> readers;
 
     @SuppressWarnings("unused")
     private Comparator<String> comparator;
@@ -37,7 +37,7 @@ public class SortedColumnDFSFileReader implements 
ReadableTable.TableReader {
 
     private String[] row;
 
-    public SortedColumnDFSFileReader(Collection<ReadableTable.TableReader> 
readers, final Comparator<String> comparator) {
+    public SortedColumnDFSFileReader(Collection<IReadableTable.TableReader> 
readers, final Comparator<String> comparator) {
         this.readers = readers;
         this.comparator = comparator;
         pq = new PriorityQueue<ReaderBuffer>(11, new 
Comparator<ReaderBuffer>() {
@@ -54,7 +54,7 @@ public class SortedColumnDFSFileReader implements 
ReadableTable.TableReader {
                 return comparator.compare(i.peek()[0], j.peek()[0]);
             }
         });
-        for (ReadableTable.TableReader reader : readers) {
+        for (IReadableTable.TableReader reader : readers) {
             if (reader != null) {
                 try {
                     pq.add(new ReaderBuffer(reader));
@@ -91,16 +91,16 @@ public class SortedColumnDFSFileReader implements 
ReadableTable.TableReader {
 
     @Override
     public void close() throws IOException {
-        for (ReadableTable.TableReader reader : readers)
+        for (IReadableTable.TableReader reader : readers)
             reader.close();
     }
 
     static class ReaderBuffer {
-        private ReadableTable.TableReader reader;
+        private IReadableTable.TableReader reader;
 
         private String[] row;
 
-        public ReaderBuffer(ReadableTable.TableReader reader) throws 
IOException {
+        public ReaderBuffer(IReadableTable.TableReader reader) throws 
IOException {
             this.reader = reader;
             reload();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index be8c305..98ebbb4 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -41,7 +41,7 @@ import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.engine.mr.SortedColumnDFSFile;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +65,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
 
         DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new 
DistinctColumnValuesProvider() {
             @Override
-            public ReadableTable getDistinctValuesFor(TblColRef col) {
+            public IReadableTable getDistinctValuesFor(TblColRef col) {
                 return new SortedColumnDFSFile(factColumnsInputPath + "/" + 
col.getIdentity(), col.getType());
             }
         }, new DictionaryProvider() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
index dc593b0..04af4fe 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -43,7 +43,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index 08a8cb0..41bdd28 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -290,7 +290,7 @@ public class KylinKryoRegistrator implements 
KryoRegistrator {
         
kyroClasses.add(org.apache.kylin.metadata.project.RealizationEntry.class);
         
kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class);
         
kyroClasses.add(org.apache.kylin.metadata.streaming.StreamingConfig.class);
-        
kyroClasses.add(org.apache.kylin.source.ReadableTable.TableSignature.class);
+        
kyroClasses.add(org.apache.kylin.source.IReadableTable.TableSignature.class);
         kyroClasses.add(org.apache.kylin.storage.hybrid.HybridInstance.class);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java 
b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
index 458703a..4dcfdb2 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
@@ -36,7 +36,7 @@ import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.engine.mr.DFSFileTable;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.IReadableTable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -119,7 +119,7 @@ public class ITDictionaryManagerTest extends 
LocalFileMetadataTestCase {
         }
 
         @Override
-        public ReadableTable getDistinctValuesFor(TblColRef col) {
+        public IReadableTable getDistinctValuesFor(TblColRef col) {
             return new DFSFileTable(tmpFilePath, -1);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 58078eb..5719523 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -339,8 +339,8 @@ public class BuildCubeWithEngine {
 
     private Boolean buildSegment(String cubeName, long startDate, long 
endDate) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        ISource source = SourceFactory.tableSource(cubeInstance);
-        SourcePartition partition = 
source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 
0, 0, null, null));
+        ISource source = SourceFactory.getSource(cubeInstance);
+        SourcePartition partition = 
source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 
endDate, 0, 0, null, null));
         CubeSegment segment = cubeManager.appendSegment(cubeInstance, 
partition.getStartDate(), partition.getEndDate());
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 5310816..9c80413 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -281,8 +281,8 @@ public class BuildCubeWithStream {
 
     protected ExecutableState buildSegment(String cubeName, long startOffset, 
long endOffset) throws Exception {
         CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        ISource source = SourceFactory.tableSource(cubeInstance);
-        SourcePartition partition = 
source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, 
startOffset, endOffset, null, null));
+        ISource source = SourceFactory.getSource(cubeInstance);
+        SourcePartition partition = 
source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, 
startOffset, endOffset, null, null));
         CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index 7aff3ba..d972eeb 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -20,11 +20,13 @@ package org.apache.kylin.source.hive;
 
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,14 +44,17 @@ public class ITHiveSourceTableLoaderTest extends 
HBaseMetadataTestCase {
     }
 
     @Test
-    public void test() throws IOException {
-        KylinConfig config = getTestConfig();
-        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", 
"EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.loadHiveTables(toLoad, 
config);
-
-        assertTrue(loaded.size() == toLoad.length);
-        for (String str : toLoad)
-            assertTrue(loaded.contains(str));
+    public void test() throws Exception {
+        ISource source = SourceFactory.getDefaultSource();
+        ISourceMetadataExplorer explr = source.getSourceMetadataExplorer();
+        Pair<TableDesc, TableExtDesc> pair;
+        
+        pair = explr.loadTableMetadata("DEFAULT", "TEST_KYLIN_FACT");
+        
assertTrue(pair.getFirst().getIdentity().equals("DEFAULT.TEST_KYLIN_FACT"));
+        
+        pair = explr.loadTableMetadata("EDW", "TEST_CAL_DT");
+        assertTrue(pair.getFirst().getIdentity().equals("EDW.TEST_CAL_DT"));
+        
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 6396749..c56c9bb 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -26,8 +26,8 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableReader;
 import org.apache.kylin.source.SourceFactory;
 import org.junit.After;
 import org.junit.Before;
@@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends 
HBaseMetadataTestCase {
     public void basicTest() throws Exception {
         String tableName = "EDW.TEST_SITES";
         TableDesc tableDesc = 
MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
-        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+        IReadableTable hiveTable = 
SourceFactory.createReadableTable(tableDesc);
         String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc).getResourcePath();
 
         snapshotMgr.wipeoutCache();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/pom.xml
----------------------------------------------------------------------
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 3db88be..b165b99 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -47,6 +47,8 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        
+        <!-- these plug-in modules, should not have API dependencies -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7d4cbe6..5b16995 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -307,7 +307,7 @@ public class CubeController extends BasicController {
                     sourcePartitionOffsetStart, sourcePartitionOffsetEnd, 
CubeBuildTypeEnum.valueOf(buildType), force, submitter);
         } catch (Throwable e) {
             logger.error(e.getLocalizedMessage(), e);
-            throw new InternalErrorException(e.getLocalizedMessage());
+            throw new InternalErrorException(e.getLocalizedMessage(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 9785db2..ebbfeb2 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -152,7 +152,7 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/{tableNames}/cardinality", method = { 
RequestMethod.PUT }, produces = { "application/json" })
     @ResponseBody
-    public CardinalityRequest generateCardinality(@PathVariable String 
tableNames, @RequestBody CardinalityRequest request) throws IOException {
+    public CardinalityRequest generateCardinality(@PathVariable String 
tableNames, @RequestBody CardinalityRequest request) throws Exception {
         String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();
         String[] tables = tableNames.split(",");
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java
index 4a4bf74..bfc5555 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/TableControllerV2.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.controller2;
 
+import java.io.IOException;
+
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.rest.controller.BasicController;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -41,8 +43,6 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 
-import java.io.IOException;
-
 /**
  * @author xduo
  */
@@ -92,7 +92,7 @@ public class TableControllerV2 extends BasicController {
 
     @RequestMapping(value = "/load", method = { RequestMethod.POST }, produces 
= { "application/vnd.apache.kylin-v2+json" })
     @ResponseBody
-    public EnvelopeResponse loadHiveTablesV2(@RequestHeader("Accept-Language") 
String lang, @RequestBody HiveTableRequestV2 requestV2) throws IOException {
+    public EnvelopeResponse loadHiveTablesV2(@RequestHeader("Accept-Language") 
String lang, @RequestBody HiveTableRequestV2 requestV2) throws Exception {
         MsgPicker.setMsg(lang);
 
         return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, 
tableServiceV2.loadHiveTables(requestV2.getTables(), requestV2.getProject(), 
requestV2.isNeedProfile()), "");
@@ -115,7 +115,7 @@ public class TableControllerV2 extends BasicController {
 
     @RequestMapping(value = "/cardinality", method = { RequestMethod.POST }, 
produces = { "application/vnd.apache.kylin-v2+json" })
     @ResponseBody
-    public void generateCardinalityV2(@RequestHeader("Accept-Language") String 
lang, @RequestBody HiveTableRequestV2 requestV2) throws IOException {
+    public void generateCardinalityV2(@RequestHeader("Accept-Language") String 
lang, @RequestBody HiveTableRequestV2 requestV2) throws Exception {
         MsgPicker.setMsg(lang);
 
         String submitter = 
SecurityContextHolder.getContext().getAuthentication().getName();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index ae35de2..d92107b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -57,8 +57,8 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.apache.kylin.source.hive.HiveClientFactory;
-import org.apache.kylin.source.hive.IHiveClient;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -253,8 +253,8 @@ public class StorageCleanupJob extends AbstractApplication {
         final String preFix = "kylin_intermediate_";
         final String uuidPattern = 
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
 
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-        List<String> hiveTableNames = 
hiveClient.getHiveTableNames(config.getHiveDatabaseForIntermediateTable());
+        ISourceMetadataExplorer explr = 
SourceFactory.getDefaultSource().getSourceMetadataExplorer();
+        List<String> hiveTableNames = 
explr.listTables(config.getHiveDatabaseForIntermediateTable());
         Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, 
new Predicate<String>() {
             @Override
             public boolean apply(@Nullable String input) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4e5f80f..fd9112b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -211,9 +211,9 @@ public class JobService extends BasicService implements 
InitializingBean {
         CubeSegment newSeg = null;
         try {
             if (buildType == CubeBuildTypeEnum.BUILD) {
-                ISource source = SourceFactory.tableSource(cube);
+                ISource source = SourceFactory.getSource(cube);
                 SourcePartition sourcePartition = new 
SourcePartition(startDate, endDate, startOffset, endOffset, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
-                sourcePartition = source.parsePartitionBeforeBuild(cube, 
sourcePartition);
+                sourcePartition = 
source.enrichSourcePartitionBeforeBuild(cube, sourcePartition);
                 newSeg = getCubeManager().appendSegment(cube, sourcePartition);
                 job = EngineFactory.createBatchCubingJob(newSeg, submitter);
             } else if (buildType == CubeBuildTypeEnum.MERGE) {

Reply via email to