http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
new file mode 100644
index 0000000..5e32935
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.model.ExternalFilterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class TableMetadataManager {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TableMetadataManager.class);
+
+    public static final Serializer<TableDesc> TABLE_SERIALIZER = new 
JsonSerializer<TableDesc>(TableDesc.class);
+    public static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new 
JsonSerializer<TableExtDesc>(
+            TableExtDesc.class);
+    public static final Serializer<ExternalFilterDesc> 
EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(
+            ExternalFilterDesc.class);
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, TableMetadataManager> 
CACHE = new ConcurrentHashMap<KylinConfig, TableMetadataManager>();
+
+    public static TableMetadataManager getInstance(KylinConfig config) {
+        TableMetadataManager r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (TableMetadataManager.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+            try {
+                r = new TableMetadataManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one singleton exist, current keys: 
{}", StringUtils
+                            
.join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, 
String>() {
+                                @Nullable
+                                @Override
+                                public String apply(@Nullable KylinConfig 
input) {
+                                    return 
String.valueOf(System.identityHashCode(input));
+                                }
+                            }), ","));
+                }
+
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init 
TableMetadataManager from " + config, e);
+            }
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // 
============================================================================
+
+    private KylinConfig config;
+    // table name ==> SourceTable
+    private CaseInsensitiveStringCache<TableDesc> srcTableMap;
+    // name => SourceTableExt
+    private CaseInsensitiveStringCache<TableExtDesc> srcTableExtMap;
+    // name => External Filter Desc
+    private CaseInsensitiveStringCache<ExternalFilterDesc> extFilterMap;
+
+    private TableMetadataManager(KylinConfig config) throws IOException {
+        init(config);
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+
+    public ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
+
+    public List<TableDesc> listAllTables(String prj) {
+        return Lists.newArrayList(getAllTablesMap(prj).values());
+    }
+
+    public List<ExternalFilterDesc> listAllExternalFilters() {
+        return Lists.newArrayList(extFilterMap.values());
+    }
+
+    public Map<String, TableDesc> getAllTablesMap(String prj) {
+        Map<String, TableDesc> globalTables = new LinkedHashMap<>();
+        Map<String, TableDesc> projectTables = new LinkedHashMap<>();
+
+        for (TableDesc t : srcTableMap.values()) {
+            if (t.getProject() == null)
+                globalTables.put(t.getIdentity(), t);
+            else if (t.getProject().equals(prj))
+                projectTables.put(t.getIdentity(), t);
+        }
+
+        Map<String, TableDesc> result = globalTables;
+        result.putAll(projectTables);
+        return result;
+    }
+
+    /**
+     * Get TableDesc by name
+     */
+    public TableDesc getTableDesc(String tableName, String prj) {
+        if (tableName.indexOf(".") < 0)
+            tableName = "DEFAULT." + tableName;
+
+        tableName.toUpperCase();
+
+        TableDesc result = srcTableMap.get(mapKey(tableName, prj));
+        if (result == null)
+            result = srcTableMap.get(mapKey(tableName, null));
+
+        return result;
+    }
+
+    public ExternalFilterDesc getExtFilterDesc(String filterTableName) {
+        ExternalFilterDesc result = extFilterMap.get(filterTableName);
+        return result;
+    }
+
+    /**
+     * Get table extended info. Keys are defined in {@link MetadataConstants}
+     * 
+     * @param tableName
+     * @return
+     */
+    public TableExtDesc getTableExt(String tableName, String prj) {
+        TableDesc t = getTableDesc(tableName, prj);
+        if (t == null)
+            return null;
+
+        return getTableExt(t);
+    }
+
+    public TableExtDesc getTableExt(TableDesc t) {
+        TableExtDesc result = srcTableExtMap.get(mapKey(t.getIdentity(), 
t.getProject()));
+
+        // avoid returning null, since the TableDesc exists
+        if (null == result) {
+            result = new TableExtDesc();
+            result.setIdentity(t.getIdentity());
+            result.setUuid(UUID.randomUUID().toString());
+            result.setLastModified(0);
+            result.init(t.getProject());
+            srcTableExtMap.put(mapKey(t.getIdentity(), t.getProject()), 
result);
+        }
+        return result;
+    }
+
+    public void saveTableExt(TableExtDesc tableExt, String prj) throws 
IOException {
+        if (tableExt.getUuid() == null || tableExt.getIdentity() == null) {
+            throw new IllegalArgumentException();
+        }
+
+        // updating a legacy global table
+        if (tableExt.getProject() == null) {
+            if (getTableExt(tableExt.getIdentity(), prj).getProject() != null)
+                throw new IllegalStateException(
+                        "Updating a legacy global TableExtDesc while a project 
level version exists: "
+                                + tableExt.getIdentity() + ", " + prj);
+            prj = tableExt.getProject();
+        }
+
+        tableExt.init(prj);
+
+        String path = TableExtDesc.concatResourcePath(tableExt.getIdentity(), 
prj);
+
+        ResourceStore store = getStore();
+
+        TableExtDesc t = store.getResource(path, TableExtDesc.class, 
TABLE_EXT_SERIALIZER);
+        if (t != null && t.getIdentity() == null)
+            store.deleteResource(path);
+
+        store.putResource(path, tableExt, TABLE_EXT_SERIALIZER);
+        srcTableExtMap.put(mapKey(tableExt.getIdentity(), 
tableExt.getProject()), tableExt);
+    }
+
+    public void removeTableExt(String tableName, String prj) throws 
IOException {
+        // note, here assume always delete TableExtDesc first, then TableDesc
+        TableExtDesc t = getTableExt(tableName, prj);
+        if (t == null)
+            return;
+
+        String path = TableExtDesc.concatResourcePath(t.getIdentity(), 
t.getProject());
+        getStore().deleteResource(path);
+        srcTableExtMap.remove(mapKey(t.getIdentity(), t.getProject()));
+    }
+
+    public void saveSourceTable(TableDesc srcTable, String prj) throws 
IOException {
+        if (srcTable.getUuid() == null || srcTable.getIdentity() == null) {
+            throw new IllegalArgumentException();
+        }
+
+        srcTable.init(prj);
+
+        String path = TableDesc.concatResourcePath(srcTable.getIdentity(), 
prj);
+        getStore().putResource(path, srcTable, TABLE_SERIALIZER);
+
+        srcTableMap.put(mapKey(srcTable.getIdentity(), prj), srcTable);
+    }
+
+    public void removeSourceTable(String tableIdentity, String prj) throws 
IOException {
+        TableDesc t = getTableDesc(tableIdentity, prj);
+        if (t == null)
+            return;
+
+        String path = TableDesc.concatResourcePath(t.getIdentity(), 
t.getProject());
+        getStore().deleteResource(path);
+        srcTableMap.remove(mapKey(t.getIdentity(), t.getProject()));
+    }
+
+    public void saveExternalFilter(ExternalFilterDesc desc) throws IOException 
{
+        if (desc.getUuid() == null) {
+            throw new IllegalArgumentException("UUID not set.");
+        }
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, EXTERNAL_FILTER_DESC_SERIALIZER);
+        desc = reloadExternalFilterAt(path);
+        extFilterMap.put(desc.getName(), desc);
+
+    }
+
+    public void removeExternalFilter(String name) throws IOException {
+        String path = ExternalFilterDesc.concatResourcePath(name);
+        getStore().deleteResource(path);
+        extFilterMap.remove(name);
+
+    }
+
+    private void init(KylinConfig config) throws IOException {
+        this.config = config;
+        this.srcTableMap = new CaseInsensitiveStringCache<>(config, "table");
+        this.srcTableExtMap = new CaseInsensitiveStringCache<>(config, 
"table_ext");
+        this.extFilterMap = new CaseInsensitiveStringCache<>(config, 
"external_filter");
+
+        reloadAllSourceTable();
+        reloadAllTableExt();
+        reloadAllExternalFilter();
+
+        // touch lower level metadata before registering my listener
+        Broadcaster.getInstance(config).registerListener(new 
SrcTableSyncListener(), "table");
+        Broadcaster.getInstance(config).registerListener(new 
SrcTableExtSyncListener(), "table_ext");
+        Broadcaster.getInstance(config).registerListener(new 
ExtFilterSyncListener(), "external_filter");
+    }
+
+    private class SrcTableSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
+            if (event == Event.DROP)
+                srcTableMap.removeLocal(cacheKey);
+            else
+                reloadSourceTableAt(TableDesc.concatRawResourcePath(cacheKey));
+
+            Pair<String, String> pair = TableDesc.parseResourcePath(cacheKey);
+            String table = pair.getFirst();
+            String prj = pair.getSecond();
+
+            if (prj == null) {
+                for (ProjectInstance p : 
ProjectManager.getInstance(config).findProjectsByTable(table)) {
+                    broadcaster.notifyProjectSchemaUpdate(p.getName());
+                }
+            } else {
+                broadcaster.notifyProjectSchemaUpdate(prj);
+            }
+        }
+    }
+
+    private class SrcTableExtSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
+            if (event == Event.DROP)
+                srcTableExtMap.removeLocal(cacheKey);
+            else
+                reloadTableExtAt(TableExtDesc.concatRawResourcePath(cacheKey));
+        }
+    }
+
+    private class ExtFilterSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
+            if (event == Event.DROP)
+                extFilterMap.removeLocal(cacheKey);
+            else
+                reloadExtFilter(cacheKey);
+        }
+    }
+
+    private void reloadAllTableExt() throws IOException {
+        ResourceStore store = getStore();
+        logger.debug("Reloading Table_exd info from folder "
+                + 
store.getReadableResourcePath(ResourceStore.TABLE_EXD_RESOURCE_ROOT));
+
+        srcTableExtMap.clear();
+
+        List<String> paths = 
store.collectResourceRecursively(ResourceStore.TABLE_EXD_RESOURCE_ROOT,
+                MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            reloadTableExtAt(path);
+        }
+
+        logger.debug("Loaded " + srcTableExtMap.size() + " SourceTable 
EXD(s)");
+    }
+
+    private TableExtDesc reloadTableExtAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        String prj = TableExtDesc.parseResourcePath(path).getSecond();
+
+        TableExtDesc t = store.getResource(path, TableExtDesc.class, 
TABLE_EXT_SERIALIZER);
+
+        if (t == null) {
+            return null;
+        }
+
+        // convert old tableExt json to new one
+        if (t.getIdentity() == null) {
+            t = convertOldTableExtToNewer(path);
+        }
+
+        t.init(prj);
+
+        srcTableExtMap.putLocal(mapKey(t.getIdentity(), prj), t);
+        return t;
+    }
+
+    private String mapKey(String identity, String prj) {
+        return prj == null ? identity : identity + "--" + prj;
+    }
+
+    private TableExtDesc convertOldTableExtToNewer(String path) throws 
IOException {
+        Map<String, String> attrs = Maps.newHashMap();
+
+        ResourceStore store = getStore();
+        RawResource res = store.getResource(path);
+
+        InputStream is = res.inputStream;
+
+        try {
+            attrs.putAll(JsonUtil.readValue(is, HashMap.class));
+        } finally {
+            if (is != null)
+                is.close();
+        }
+
+        String cardinality = 
attrs.get(MetadataConstants.TABLE_EXD_CARDINALITY);
+
+        // parse table identity from file name
+        String file = path;
+        if (file.indexOf("/") > -1) {
+            file = file.substring(file.lastIndexOf("/") + 1);
+        }
+        String tableIdentity = file.substring(0, file.length() - 
MetadataConstants.FILE_SURFIX.length()).toUpperCase();
+        TableExtDesc result = new TableExtDesc();
+        result.setIdentity(tableIdentity);
+        result.setUuid(UUID.randomUUID().toString());
+        result.setLastModified(0);
+        result.setCardinality(cardinality);
+        return result;
+    }
+
+    private void reloadAllExternalFilter() throws IOException {
+        ResourceStore store = getStore();
+        logger.debug("Reloading ExternalFilter from folder "
+                + 
store.getReadableResourcePath(ResourceStore.EXTERNAL_FILTER_RESOURCE_ROOT));
+
+        extFilterMap.clear();
+
+        List<String> paths = 
store.collectResourceRecursively(ResourceStore.EXTERNAL_FILTER_RESOURCE_ROOT,
+                MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            reloadExternalFilterAt(path);
+        }
+
+        logger.debug("Loaded " + extFilterMap.size() + " ExternalFilter(s)");
+    }
+
+    private void reloadAllSourceTable() throws IOException {
+        ResourceStore store = getStore();
+        logger.debug("Reloading SourceTable from folder "
+                + 
store.getReadableResourcePath(ResourceStore.TABLE_RESOURCE_ROOT));
+
+        srcTableMap.clear();
+
+        List<String> paths = 
store.collectResourceRecursively(ResourceStore.TABLE_RESOURCE_ROOT,
+                MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            reloadSourceTableAt(path);
+        }
+
+        logger.debug("Loaded " + srcTableMap.size() + " SourceTable(s)");
+    }
+
+    private TableDesc reloadSourceTableAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        String prj = TableDesc.parseResourcePath(path).getSecond();
+
+        TableDesc t = store.getResource(path, TableDesc.class, 
TABLE_SERIALIZER);
+        if (t == null) {
+            return null;
+        }
+        t.init(prj);
+
+        srcTableMap.putLocal(mapKey(t.getIdentity(), prj), t);
+
+        return t;
+    }
+
+    private ExternalFilterDesc reloadExternalFilterAt(String path) throws 
IOException {
+        ResourceStore store = getStore();
+        ExternalFilterDesc t = store.getResource(path, 
ExternalFilterDesc.class, EXTERNAL_FILTER_DESC_SERIALIZER);
+        if (t == null) {
+            return null;
+        }
+        extFilterMap.putLocal(t.getName(), t);
+
+        return t;
+    }
+
+    public void reloadExtFilter(String extFilterName) throws IOException {
+        
reloadExternalFilterAt(ExternalFilterDesc.concatResourcePath(extFilterName));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index ecb8e61..6d3d541 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -26,9 +26,9 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -134,7 +134,7 @@ public class ColumnTupleFilter extends TupleFilter {
             String col = BytesUtil.readUTFString(buffer);
             
             KylinConfig config = KylinConfig.getInstanceFromEnv();
-            DataModelDesc modelDesc = 
MetadataManager.getInstance(config).getDataModelDesc(model);
+            DataModelDesc modelDesc = 
DataModelManager.getInstance(config).getDataModelDesc(model);
             this.columnRef = modelDesc.findColumn(alias, col);
             
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
index e4e311e..edfde43 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.FunctionTupleFilter;
@@ -124,7 +124,7 @@ public class MassInTupleFilter extends FunctionTupleFilter {
 
             if (filterTableName == null) {
                 filterTableName = (String) child.getValues().iterator().next();
-                ExternalFilterDesc externalFilterDesc = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getExtFilterDesc(filterTableName);
+                ExternalFilterDesc externalFilterDesc = 
TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getExtFilterDesc(filterTableName);
                 if (externalFilterDesc == null) {
                     throw new IllegalArgumentException("External filter named 
" + filterTableName + " is not found");
                 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 1a48558..f3def90 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -29,15 +29,11 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.JoinsTree.Chain;
@@ -50,11 +46,6 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -112,10 +103,6 @@ public class DataModelDesc extends RootPersistentEntity {
     @JsonProperty("capacity")
     private RealizationCapacity capacity = RealizationCapacity.MEDIUM;
 
-    @JsonProperty("computed_columns")
-    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-    private List<ComputedColumnDesc> computedColumnDescs = 
Lists.newArrayList();
-
     // computed attributes
     private TableRef rootFactTableRef;
     private Set<TableRef> factTableRefs = Sets.newLinkedHashSet();
@@ -346,16 +333,7 @@ public class DataModelDesc extends RootPersistentEntity {
         throw new IllegalArgumentException("Table not found by " + 
tableIdentity + " in model " + name);
     }
 
-    public void init(KylinConfig config, Map<String, TableDesc> 
originalTables, List<DataModelDesc> dataModelDescs) {
-        //tweak the tables according to Computed Columns defined in model
-        Map<String, TableDesc> tables = Maps.newHashMap();
-        for (Map.Entry<String, TableDesc> entry : originalTables.entrySet()) {
-            String s = entry.getKey();
-            TableDesc tableDesc = entry.getValue();
-            TableDesc extendedTableDesc = 
tableDesc.appendColumns(createComputedColumns(tableDesc));
-            tables.put(s, extendedTableDesc);
-        }
-
+    public void init(KylinConfig config, Map<String, TableDesc> tables, 
List<DataModelDesc> otherModels) {
         this.config = config;
 
         initJoinTablesForUpgrade();
@@ -365,34 +343,14 @@ public class DataModelDesc extends RootPersistentEntity {
         initJoinsTree();
         initDimensionsAndMetrics();
         initPartitionDesc();
-        initComputedColumns(dataModelDescs);
         initFilterCondition();
 
         boolean reinit = validate();
         if (reinit) { // model slightly changed by validate() and must init() 
again
-            init(config, tables, dataModelDescs);
+            init(config, tables, otherModels);
         }
     }
 
-    private ColumnDesc[] createComputedColumns(final TableDesc tableDesc) {
-        final MutableInt id = new MutableInt(tableDesc.getColumnCount());
-        return FluentIterable.from(this.computedColumnDescs).filter(new 
Predicate<ComputedColumnDesc>() {
-            @Override
-            public boolean apply(@Nullable ComputedColumnDesc input) {
-                return 
tableDesc.getIdentity().equalsIgnoreCase(input.getTableIdentity());
-            }
-        }).transform(new Function<ComputedColumnDesc, ColumnDesc>() {
-            @Nullable
-            @Override
-            public ColumnDesc apply(@Nullable ComputedColumnDesc input) {
-                id.increment();
-                ColumnDesc columnDesc = new ColumnDesc(id.toString(), 
input.getColumnName(), input.getDatatype(),
-                        input.getComment(), null, null, input.getExpression());
-                return columnDesc;
-            }
-        }).toArray(ColumnDesc.class);
-    }
-
     private void initJoinTablesForUpgrade() {
         if (joinTables == null) {
             joinTables = new JoinTableDesc[0];
@@ -486,47 +444,6 @@ public class DataModelDesc extends RootPersistentEntity {
             this.partitionDesc.init(this);
     }
 
-    private void initComputedColumns(List<DataModelDesc> allDataModelDescs) {
-        Preconditions.checkNotNull(allDataModelDescs);
-
-        List<Pair<ComputedColumnDesc, DataModelDesc>> existingCCs = 
Lists.newArrayList();
-
-        for (DataModelDesc dataModelDesc : allDataModelDescs) {
-            if (!StringUtils.equals(dataModelDesc.getName(), this.getName())) {
-                for (ComputedColumnDesc cc : 
dataModelDesc.getComputedColumnDescs()) {
-                    existingCCs.add(Pair.newPair(cc, dataModelDesc));
-                }
-            }
-        }
-
-        for (ComputedColumnDesc newCC : this.computedColumnDescs) {
-
-            newCC.init(aliasMap, rootFactTableRef.getAlias());
-            final String newCCFullName = newCC.getFullName();
-            final String newCCColumnName = newCC.getColumnName();
-
-            for (Pair<ComputedColumnDesc, DataModelDesc> pair : existingCCs) {
-                DataModelDesc dataModelDesc = pair.getSecond();
-                ComputedColumnDesc cc = pair.getFirst();
-
-                if (StringUtils.equalsIgnoreCase(cc.getFullName(), 
newCCFullName) && !(cc.equals(newCC))) {
-                    throw new IllegalArgumentException(String.format(
-                            "Column name for computed column %s is already 
used in model %s, you should apply the same expression ' %s ' here, or use a 
different column name.",
-                            newCCFullName, dataModelDesc.getName(), 
cc.getExpression()));
-                }
-
-                if (isTwoCCDefinitionEquals(cc.getExpression(), 
newCC.getExpression())
-                        && !StringUtils.equalsIgnoreCase(cc.getColumnName(), 
newCCColumnName)) {
-                    throw new IllegalArgumentException(String.format(
-                            "Expression %s in computed column %s is already 
defined by computed column %s from model %s, you should use the same column 
name: ' %s ' .",
-                            newCC.getExpression(), newCCFullName, 
cc.getFullName(), dataModelDesc.getName(),
-                            cc.getColumnName()));
-                }
-            }
-            existingCCs.add(Pair.newPair(newCC, this));
-        }
-    }
-
     //Check if the filter condition is illegal.  
     private void initFilterCondition() {
         if (null == this.filterCondition) {
@@ -569,12 +486,6 @@ public class DataModelDesc extends RootPersistentEntity {
         }
     }
 
-    private boolean isTwoCCDefinitionEquals(String definition0, String 
definition1) {
-        definition0 = definition0.replaceAll("\\s*", "");
-        definition1 = definition1.replaceAll("\\s*", "");
-        return definition0.equalsIgnoreCase(definition1);
-    }
-
     private void initJoinColumns() {
 
         for (JoinTableDesc joinTable : joinTables) {
@@ -813,32 +724,6 @@ public class DataModelDesc extends RootPersistentEntity {
         return dimensions;
     }
 
-    public ComputedColumnDesc findCCByCCColumnName(final String columnName) {
-        return Iterables.find(this.computedColumnDescs, new 
Predicate<ComputedColumnDesc>() {
-            @Override
-            public boolean apply(@Nullable ComputedColumnDesc input) {
-                Preconditions.checkNotNull(input);
-                return columnName.equals(input.getColumnName());
-            }
-        });
-    }
-
-    public Set<String> getComputedColumnNames() {
-        Set<String> ccColumnNames = Sets.newHashSet();
-        for (ComputedColumnDesc cc : this.getComputedColumnDescs()) {
-            ccColumnNames.add(cc.getColumnName());
-        }
-        return Collections.unmodifiableSet(ccColumnNames);
-    }
-
-    public List<ComputedColumnDesc> getComputedColumnDescs() {
-        return computedColumnDescs;
-    }
-
-    public void setComputedColumnDescs(List<ComputedColumnDesc> 
computedColumnDescs) {
-        this.computedColumnDescs = computedColumnDescs;
-    }
-
     public String[] getMetrics() {
         return metrics;
     }
@@ -860,7 +745,10 @@ public class DataModelDesc extends RootPersistentEntity {
     }
 
     public static DataModelDesc getCopyOf(DataModelDesc orig) {
-        DataModelDesc copy = new DataModelDesc();
+        return copy(orig, new DataModelDesc());
+    }
+    
+    public static DataModelDesc copy(DataModelDesc orig, DataModelDesc copy) {
         copy.config = orig.config;
         copy.name = orig.name;
         copy.isDraft = orig.isDraft;
@@ -872,7 +760,6 @@ public class DataModelDesc extends RootPersistentEntity {
         copy.metrics = orig.metrics;
         copy.filterCondition = orig.filterCondition;
         copy.capacity = orig.capacity;
-        copy.computedColumnDescs = orig.computedColumnDescs;
         if (orig.getPartitionDesc() != null) {
             copy.partitionDesc = 
PartitionDesc.getCopyOf(orig.getPartitionDesc());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
new file mode 100644
index 0000000..fcf47ab
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class DataModelManager {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DataModelManager.class);
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, DataModelManager> CACHE = 
new ConcurrentHashMap<KylinConfig, DataModelManager>();
+
+    public static DataModelManager getInstance(KylinConfig config) {
+        DataModelManager r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (DataModelManager.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+            r = newInstance(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one singleton exist, current keys: {}", 
StringUtils
+                        .join(Iterators.transform(CACHE.keySet().iterator(), 
new Function<KylinConfig, String>() {
+                            @Nullable
+                            @Override
+                            public String apply(@Nullable KylinConfig input) {
+                                return 
String.valueOf(System.identityHashCode(input));
+                            }
+                        }), ","));
+            }
+
+            return r;
+        }
+    }
+    
+    private static DataModelManager newInstance(KylinConfig conf) {
+        try {
+            String cls = StringUtil.noBlank(conf.getDataModelManagerImpl(), 
DataModelManager.class.getName());
+            Class<? extends DataModelManager> clz = ClassUtil.forName(cls, 
DataModelManager.class);
+            return clz.getConstructor(KylinConfig.class).newInstance(conf);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to init DataModelManager from " 
+ conf, e);
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // 
============================================================================
+
+    private KylinConfig config;
+    private Serializer<DataModelDesc> serializer;
+    
+    // name => DataModelDesc
+    private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap;
+
+    public DataModelManager(KylinConfig config) throws IOException {
+        init(config);
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+
+    public ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
+    
+    public Serializer<DataModelDesc> getDataModelSerializer() {
+        if (serializer == null) {
+            try {
+                String cls = StringUtil.noBlank(config.getDataModelImpl(), 
DataModelDesc.class.getName());
+                Class<? extends DataModelDesc> clz = ClassUtil.forName(cls, 
DataModelDesc.class);
+                serializer = new JsonSerializer(clz);
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return serializer;
+    }
+
+    public List<DataModelDesc> listDataModels() {
+        return Lists.newArrayList(this.dataModelDescMap.values());
+    }
+
+    protected void init(KylinConfig config) throws IOException {
+        this.config = config;
+        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, 
"data_model");
+
+        // touch lower level metadata before registering model listener
+        TableMetadataManager.getInstance(config);
+        
+        reloadAllDataModel();
+        Broadcaster.getInstance(config).registerListener(new 
DataModelSyncListener(), "data_model");
+    }
+
+    private class DataModelSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
+            for (String model : 
ProjectManager.getInstance(config).getProject(project).getModels()) {
+                reloadDataModelDescAt(DataModelDesc.concatResourcePath(model));
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                throws IOException {
+            if (event == Event.DROP)
+                dataModelDescMap.removeLocal(cacheKey);
+            else
+                
reloadDataModelDescAt(DataModelDesc.concatResourcePath(cacheKey));
+
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjectsByModel(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    public DataModelDesc getDataModelDesc(String name) {
+        return dataModelDescMap.get(name);
+    }
+
+    public List<DataModelDesc> getModels() {
+        return new ArrayList<>(dataModelDescMap.values());
+    }
+
+    public List<DataModelDesc> getModels(String projectName) {
+        ProjectInstance projectInstance = 
ProjectManager.getInstance(config).getProject(projectName);
+        ArrayList<DataModelDesc> ret = new ArrayList<>();
+
+        if (projectInstance != null && projectInstance.getModels() != null) {
+            for (String modelName : projectInstance.getModels()) {
+                DataModelDesc model = getDataModelDesc(modelName);
+                if (null != model) {
+                    ret.add(model);
+                } else {
+                    logger.error("Failed to load model " + modelName);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    // within a project, find models that use the specified table
+    public List<String> getModelsUsingTable(TableDesc table, String project) 
throws IOException {
+        List<String> models = new ArrayList<>();
+        for (DataModelDesc modelDesc : getModels(project)) {
+            if (modelDesc.containsTable(table))
+                models.add(modelDesc.getName());
+        }
+        return models;
+    }
+
+    public boolean isTableInAnyModel(TableDesc table) {
+        for (DataModelDesc modelDesc : getModels()) {
+            if (modelDesc.containsTable(table))
+                return true;
+        }
+        return false;
+    }
+
+    private void reloadAllDataModel() throws IOException {
+        ResourceStore store = getStore();
+        logger.debug("Reloading DataModel from folder "
+                + 
store.getReadableResourcePath(ResourceStore.DATA_MODEL_DESC_RESOURCE_ROOT));
+
+        dataModelDescMap.clear();
+
+        List<String> paths = 
store.collectResourceRecursively(ResourceStore.DATA_MODEL_DESC_RESOURCE_ROOT,
+                MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+
+            try {
+                logger.info("Reloading data model at " + path);
+                reloadDataModelDescAt(path);
+            } catch (IllegalStateException e) {
+                logger.error("Error to load DataModel at " + path, e);
+                continue;
+            }
+        }
+
+        logger.debug("Loaded " + dataModelDescMap.size() + " DataModel(s)");
+    }
+
+    public DataModelDesc reloadDataModelDescAt(String path) {
+        ResourceStore store = getStore();
+        try {
+            DataModelDesc dataModelDesc = store.getResource(path, 
DataModelDesc.class, getDataModelSerializer());
+            String prj = 
ProjectManager.getInstance(config).getProjectOfModel(dataModelDesc.getName()).getName();
+
+            if (!dataModelDesc.isDraft()) {
+                dataModelDesc.init(config, this.getAllTablesMap(prj), 
listDataModels());
+            }
+
+            dataModelDescMap.putLocal(dataModelDesc.getName(), dataModelDesc);
+            return dataModelDesc;
+        } catch (Exception e) {
+            throw new IllegalStateException("Error to load " + path, e);
+        }
+    }
+
+    // sync on update
+    public DataModelDesc dropModel(DataModelDesc desc) throws IOException {
+        logger.info("Dropping model '" + desc.getName() + "'");
+        ResourceStore store = getStore();
+        store.deleteResource(desc.getResourcePath());
+        // delete model from project
+        
ProjectManager.getInstance(config).removeModelFromProjects(desc.getName());
+        // clean model cache
+        this.afterModelDropped(desc);
+        return desc;
+    }
+
+    private void afterModelDropped(DataModelDesc desc) {
+        dataModelDescMap.remove(desc.getName());
+    }
+
+    public DataModelDesc createDataModelDesc(DataModelDesc desc, String 
projectName, String owner) throws IOException {
+        String name = desc.getName();
+        if (dataModelDescMap.containsKey(name))
+            throw new IllegalArgumentException("DataModelDesc '" + name + "' 
already exists");
+
+        ProjectManager prjMgr = ProjectManager.getInstance(config);
+        ProjectInstance prj = prjMgr.getProject(projectName);
+        if (prj.containsModel(name))
+            throw new IllegalStateException("project " + projectName + " 
already contains model " + name);
+
+        try {
+            // Temporarily register model under project, because we want to 
+            // update project formally after model is saved.
+            prj.getModels().add(name);
+
+            desc.setOwner(owner);
+            desc = saveDataModelDesc(desc);
+
+        } finally {
+            prj.getModels().remove(name);
+        }
+
+        // now that model is saved, update project formally
+        prjMgr.updateModelToProject(name, projectName);
+
+        return desc;
+    }
+
+    public DataModelDesc updateDataModelDesc(DataModelDesc desc) throws 
IOException {
+        String name = desc.getName();
+        if (!dataModelDescMap.containsKey(name)) {
+            throw new IllegalArgumentException("DataModelDesc '" + name + "' 
does not exist.");
+        }
+
+        return saveDataModelDesc(desc);
+    }
+
+    private DataModelDesc saveDataModelDesc(DataModelDesc dataModelDesc) 
throws IOException {
+
+        String prj = 
ProjectManager.getInstance(config).getProjectOfModel(dataModelDesc.getName()).getName();
+
+        if (!dataModelDesc.isDraft())
+            dataModelDesc.init(config, this.getAllTablesMap(prj), 
listDataModels());
+
+        String path = dataModelDesc.getResourcePath();
+        getStore().putResource(path, dataModelDesc, getDataModelSerializer());
+        dataModelDescMap.put(dataModelDesc.getName(), dataModelDesc);
+
+        return dataModelDesc;
+    }
+
+    private Map<String, TableDesc> getAllTablesMap(String prj) {
+        return TableMetadataManager.getInstance(config).getAllTablesMap(prj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 1f54416..5f0e6a3 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -92,13 +92,6 @@ public class ProjectInstance extends RootPersistentEntity {
         return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + 
".json";
     }
 
-    public static String getNormalizedProjectName(String project) {
-        if (project == null)
-            throw new IllegalStateException("Trying to normalized a project 
name which is null");
-
-        return project.toUpperCase();
-    }
-
     public static ProjectInstance create(String name, String owner, String 
description, LinkedHashMap<String, String> overrideProps, 
List<RealizationEntry> realizationEntries, List<String> models) {
         ProjectInstance projectInstance = new ProjectInstance();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
index 1dad89c..df086ec 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
@@ -22,8 +22,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
 
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.ExternalFilterDesc;
@@ -52,7 +53,7 @@ class ProjectL2Cache {
     private static final Logger logger = 
LoggerFactory.getLogger(ProjectL2Cache.class);
 
     private ProjectManager mgr;
-    private Map<String, ProjectCache> projectCaches = Maps.newConcurrentMap();
+    private Map<String, ProjectCache> projectCaches = new 
ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
 
     ProjectL2Cache(ProjectManager mgr) {
         this.mgr = mgr;
@@ -178,7 +179,6 @@ class ProjectL2Cache {
     // 
----------------------------------------------------------------------------
 
     private ProjectCache getCache(String project) {
-        project = ProjectInstance.getNormalizedProjectName(project);
         ProjectCache result = projectCaches.get(project);
         if (result == null) {
             result = loadCache(project);
@@ -196,7 +196,7 @@ class ProjectL2Cache {
         if (pi == null)
             throw new IllegalArgumentException("Project '" + project + "' does 
not exist;");
 
-        MetadataManager metaMgr = mgr.getMetadataManager();
+        TableMetadataManager metaMgr = mgr.getTableManager();
 
         for (String tableName : pi.getTables()) {
             TableDesc tableDesc = metaMgr.getTableDesc(tableName, project);
@@ -248,7 +248,7 @@ class ProjectL2Cache {
         if (realization == null)
             return false;
 
-        MetadataManager metaMgr = mgr.getMetadataManager();
+        TableMetadataManager metaMgr = mgr.getTableManager();
 
         Set<TblColRef> allColumns = realization.getAllColumns();
         if (allColumns == null || allColumns.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 527233e..8044797 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -34,7 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
@@ -312,16 +312,15 @@ public class ProjectManager {
         }
     }
 
-    private ProjectInstance addModelToProject(String modelName, String 
project) throws IOException {
-        String newProjectName = 
ProjectInstance.getNormalizedProjectName(project);
-        ProjectInstance newProject = getProject(newProjectName);
-        if (newProject == null) {
-            throw new IllegalArgumentException("Project " + newProjectName + " 
does not exist.");
+    private ProjectInstance addModelToProject(String modelName, String 
prjName) throws IOException {
+        ProjectInstance prj = getProject(prjName);
+        if (prj == null) {
+            throw new IllegalArgumentException("Project " + prjName + " does 
not exist.");
         }
-        newProject.addModel(modelName);
-        updateProject(newProject);
+        prj.addModel(modelName);
+        updateProject(prj);
 
-        return newProject;
+        return prj;
     }
 
     public ProjectInstance moveRealizationToProject(RealizationType type, 
String realizationName, String newProjectName,
@@ -357,7 +356,7 @@ public class ProjectManager {
     }
 
     public ProjectInstance addTableDescToProject(String[] tableIdentities, 
String projectName) throws IOException {
-        MetadataManager metaMgr = getMetadataManager();
+        TableMetadataManager metaMgr = getTableManager();
         ProjectInstance projectInstance = getProject(projectName);
         for (String tableId : tableIdentities) {
             TableDesc table = metaMgr.getTableDesc(tableId, projectName);
@@ -372,7 +371,7 @@ public class ProjectManager {
     }
 
     public void removeTableDescFromProject(String tableIdentities, String 
projectName) throws IOException {
-        MetadataManager metaMgr = getMetadataManager();
+        TableMetadataManager metaMgr = getTableManager();
         ProjectInstance projectInstance = getProject(projectName);
         TableDesc table = metaMgr.getTableDesc(tableIdentities, projectName);
         if (table == null) {
@@ -384,7 +383,7 @@ public class ProjectManager {
     }
 
     public ProjectInstance addExtFilterToProject(String[] filters, String 
projectName) throws IOException {
-        MetadataManager metaMgr = getMetadataManager();
+        TableMetadataManager metaMgr = getTableManager();
         ProjectInstance projectInstance = getProject(projectName);
         for (String filterName : filters) {
             ExternalFilterDesc extFilter = 
metaMgr.getExtFilterDesc(filterName);
@@ -399,7 +398,7 @@ public class ProjectManager {
     }
 
     public void removeExtFilterFromProject(String filterName, String 
projectName) throws IOException {
-        MetadataManager metaMgr = getMetadataManager();
+        TableMetadataManager metaMgr = getTableManager();
         ProjectInstance projectInstance = getProject(projectName);
         ExternalFilterDesc filter = metaMgr.getExtFilterDesc(filterName);
         if (filter == null) {
@@ -527,8 +526,8 @@ public class ProjectManager {
         return ResourceStore.getStore(this.config);
     }
 
-    MetadataManager getMetadataManager() {
-        return MetadataManager.getInstance(config);
+    TableMetadataManager getTableManager() {
+        return TableMetadataManager.getInstance(config);
     }
 
     private String norm(String project) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
index 8313fce..a8c7303 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 
 public class ColumnGenConfig {
@@ -103,7 +103,7 @@ public class ColumnGenConfig {
         
         KylinConfig kylinConfig = modelGen.getModle().getConfig();
         String project = modelGen.getModle().getProject();
-        ColumnDesc pkcol = MetadataManager.getInstance(kylinConfig)//
+        ColumnDesc pkcol = TableMetadataManager.getInstance(kylinConfig)//
                 .getTableDesc(pkTableName, 
project).findColumnByName(pkColName);
         return modelGen.getPkValues(pkcol);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
index 6eb0e71..b7e8d3a 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
@@ -38,12 +38,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -307,7 +307,7 @@ public class ModelDataGenerator {
         String outputDir = args.length > 2 ? args[2] : null;
         
         KylinConfig conf = KylinConfig.getInstanceFromEnv();
-        DataModelDesc model = 
MetadataManager.getInstance(conf).getDataModelDesc(modelName);
+        DataModelDesc model = 
DataModelManager.getInstance(conf).getDataModelDesc(modelName);
         ResourceStore store = outputDir == null ? ResourceStore.getStore(conf) 
: ResourceStore.getStore(mockup(outputDir));
         
         ModelDataGenerator gen = new ModelDataGenerator(model, nRows, store);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/metadata/MetadataManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/MetadataManagerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/MetadataManagerTest.java
deleted file mode 100644
index cfa2324..0000000
--- 
a/core-metadata/src/test/java/org/apache/kylin/metadata/MetadataManagerTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata;
-
-import static org.apache.kylin.metadata.MetadataManager.getInstance;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.JoinTableDesc;
-import org.apache.kylin.metadata.model.ModelDimensionDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- */
-public class MetadataManagerTest extends LocalFileMetadataTestCase {
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testCiModel() {
-        MetadataManager mgr = getInstance(getTestConfig());
-        DataModelDesc lm = mgr.getDataModelDesc("ci_left_join_model");
-        DataModelDesc im = mgr.getDataModelDesc("ci_inner_join_model");
-        assertSnowflakeQuality(lm);
-        assertSnowflakeQuality(im);
-
-        // check inner/left models are identical apart from the left/inner 
difference
-        assertEquals(lm.getJoinTables().length, im.getJoinTables().length);
-        for (int i = 0, n = im.getJoinTables().length; i < n; i++) {
-            JoinTableDesc lt = lm.getJoinTables()[i];
-            JoinTableDesc it = im.getJoinTables()[i];
-            assertEquals(lt.getAlias(), it.getAlias());
-            assertEquals(lt.getKind(), it.getKind());
-            assertEquals(lt.getTable(), it.getTable());
-            assertArrayEquals(lt.getJoin().getForeignKey(), 
it.getJoin().getForeignKey());
-            assertArrayEquals(lt.getJoin().getPrimaryKey(), 
it.getJoin().getPrimaryKey());
-            assertTrue(lt.getJoin().isLeftJoin());
-            assertTrue(it.getJoin().isInnerJoin());
-        }
-
-        assertEquals(lm.getDimensions().size(), im.getDimensions().size());
-        for (int i = 0, n = im.getDimensions().size(); i < n; i++) {
-            ModelDimensionDesc ld = lm.getDimensions().get(i);
-            ModelDimensionDesc id = im.getDimensions().get(i);
-            assertEquals(ld.getTable(), id.getTable());
-            assertArrayEquals(ld.getColumns(), id.getColumns());
-        }
-
-        assertArrayEquals(lm.getMetrics(), im.getMetrics());
-    }
-
-    private void assertSnowflakeQuality(DataModelDesc model) {
-        Assert.assertNotNull(model);
-        try {
-            model.findTable("TEST_COUNTRY");
-            Assert.fail();
-        } catch (IllegalArgumentException ex) {
-            // excepted
-        }
-
-        Assert.assertNotNull(model.findTable("BUYER_COUNTRY"));
-        Assert.assertNotNull(model.findTable("SELLER_COUNTRY"));
-        Assert.assertNotNull(model.findColumn("BUYER_COUNTRY.NAME"));
-        Assert.assertNotNull(model.findColumn("BUYER_ID"));
-
-    }
-
-    @Test
-    public void testListAllTables() throws Exception {
-        List<TableDesc> tables = 
getInstance(getTestConfig()).listAllTables(null);
-        Assert.assertNotNull(tables);
-        Assert.assertTrue(tables.size() > 0);
-    }
-
-    @Test
-    public void testFindTableByName() throws Exception {
-        TableDesc table = 
getInstance(getTestConfig()).getTableDesc("EDW.TEST_CAL_DT", "default");
-        Assert.assertNotNull(table);
-        Assert.assertEquals("EDW.TEST_CAL_DT", table.getIdentity());
-    }
-
-    @Test
-    public void testGetInstance() throws Exception {
-        Assert.assertNotNull(getInstance(getTestConfig()));
-        Assert.assertNotNull(getInstance(getTestConfig()).listAllTables(null));
-        
Assert.assertTrue(getInstance(getTestConfig()).listAllTables(null).size() > 0);
-    }
-
-    @Test
-    public void testDataModel() throws Exception {
-        DataModelDesc modelDesc = 
getInstance(getTestConfig()).getDataModelDesc("test_kylin_left_join_model_desc");
-        Assert.assertTrue(modelDesc.getDimensions().size() > 0);
-    }
-
-    @Test
-    public void testTableSample() throws IOException {
-        TableExtDesc tableExtDesc = 
getInstance(getTestConfig()).getTableExt("DEFAULT.WIDE_TABLE", "default");
-        Assert.assertNotNull(tableExtDesc);
-
-        List<TableExtDesc.ColumnStats> columnStatsList = new ArrayList<>();
-        TableExtDesc.ColumnStats columnStats = new TableExtDesc.ColumnStats();
-        columnStats.setColumnSamples("Max", "Min", "dfadsfdsfdsafds", "d");
-        columnStatsList.add(columnStats);
-        tableExtDesc.setColumnStats(columnStatsList);
-        getInstance(getTestConfig()).saveTableExt(tableExtDesc, "default");
-
-        TableExtDesc tableExtDesc1 = 
getInstance(getTestConfig()).getTableExt("DEFAULT.WIDE_TABLE", "default");
-        Assert.assertNotNull(tableExtDesc1);
-
-        List<TableExtDesc.ColumnStats> columnStatsList1 = 
tableExtDesc1.getColumnStats();
-        Assert.assertEquals(1, columnStatsList1.size());
-
-        getInstance(getTestConfig()).removeTableExt("DEFAULT.WIDE_TABLE", 
"default");
-    }
-
-    @Test
-    public void testTableExtCompatibility() throws IOException {
-        String tableName = "DEFAULT.WIDE_TABLE";
-        Map<String, String> oldTableExt = new HashMap<>();
-        oldTableExt.put(MetadataConstants.TABLE_EXD_CARDINALITY, "1,2,3,4");
-        mockUpOldTableExtJson(tableName, oldTableExt);
-        TableExtDesc tableExtDesc = 
getInstance(getTestConfig()).getTableExt(tableName, "default");
-        Assert.assertEquals("1,2,3,4,", tableExtDesc.getCardinality());
-        getInstance(getTestConfig()).removeTableExt(tableName, "default");
-    }
-
-    private void mockUpOldTableExtJson(String tableId, Map<String, String> 
tableExdProperties) throws IOException {
-        String path = TableExtDesc.concatResourcePath(tableId, null);
-
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-        JsonUtil.writeValueIndent(os, tableExdProperties);
-        os.flush();
-        InputStream is = new ByteArrayInputStream(os.toByteArray());
-        getStore().putResource(path, is, System.currentTimeMillis());
-        os.close();
-        is.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/metadata/TableMetadataManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/TableMetadataManagerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/TableMetadataManagerTest.java
new file mode 100644
index 0000000..ff08ca1
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/TableMetadataManagerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata;
+
+import static org.apache.kylin.metadata.TableMetadataManager.getInstance;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class TableMetadataManagerTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testListAllTables() throws Exception {
+        List<TableDesc> tables = 
getInstance(getTestConfig()).listAllTables(null);
+        Assert.assertNotNull(tables);
+        Assert.assertTrue(tables.size() > 0);
+    }
+
+    @Test
+    public void testFindTableByName() throws Exception {
+        TableDesc table = 
getInstance(getTestConfig()).getTableDesc("EDW.TEST_CAL_DT", "default");
+        Assert.assertNotNull(table);
+        Assert.assertEquals("EDW.TEST_CAL_DT", table.getIdentity());
+    }
+
+    @Test
+    public void testGetInstance() throws Exception {
+        Assert.assertNotNull(getInstance(getTestConfig()));
+        Assert.assertNotNull(getInstance(getTestConfig()).listAllTables(null));
+        
Assert.assertTrue(getInstance(getTestConfig()).listAllTables(null).size() > 0);
+    }
+
+    @Test
+    public void testTableSample() throws IOException {
+        TableExtDesc tableExtDesc = 
getInstance(getTestConfig()).getTableExt("DEFAULT.WIDE_TABLE", "default");
+        Assert.assertNotNull(tableExtDesc);
+
+        List<TableExtDesc.ColumnStats> columnStatsList = new ArrayList<>();
+        TableExtDesc.ColumnStats columnStats = new TableExtDesc.ColumnStats();
+        columnStats.setColumnSamples("Max", "Min", "dfadsfdsfdsafds", "d");
+        columnStatsList.add(columnStats);
+        tableExtDesc.setColumnStats(columnStatsList);
+        getInstance(getTestConfig()).saveTableExt(tableExtDesc, "default");
+
+        TableExtDesc tableExtDesc1 = 
getInstance(getTestConfig()).getTableExt("DEFAULT.WIDE_TABLE", "default");
+        Assert.assertNotNull(tableExtDesc1);
+
+        List<TableExtDesc.ColumnStats> columnStatsList1 = 
tableExtDesc1.getColumnStats();
+        Assert.assertEquals(1, columnStatsList1.size());
+
+        getInstance(getTestConfig()).removeTableExt("DEFAULT.WIDE_TABLE", 
"default");
+    }
+
+    @Test
+    public void testTableExtCompatibility() throws IOException {
+        String tableName = "DEFAULT.WIDE_TABLE";
+        Map<String, String> oldTableExt = new HashMap<>();
+        oldTableExt.put(MetadataConstants.TABLE_EXD_CARDINALITY, "1,2,3,4");
+        mockUpOldTableExtJson(tableName, oldTableExt);
+        TableExtDesc tableExtDesc = 
getInstance(getTestConfig()).getTableExt(tableName, "default");
+        Assert.assertEquals("1,2,3,4,", tableExtDesc.getCardinality());
+        getInstance(getTestConfig()).removeTableExt(tableName, "default");
+    }
+
+    private void mockUpOldTableExtJson(String tableId, Map<String, String> 
tableExdProperties) throws IOException {
+        String path = TableExtDesc.concatResourcePath(tableId, null);
+
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        JsonUtil.writeValueIndent(os, tableExdProperties);
+        os.flush();
+        InputStream is = new ByteArrayInputStream(os.toByteArray());
+        getStore().putResource(path, is, System.currentTimeMillis());
+        os.close();
+        is.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/metadata/draft/DraftManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/draft/DraftManagerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/draft/DraftManagerTest.java
index 153e82a..1b0fb40 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/metadata/draft/DraftManagerTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/draft/DraftManagerTest.java
@@ -24,8 +24,8 @@ import java.util.List;
 
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -71,7 +71,7 @@ public class DraftManagerTest extends 
LocalFileMetadataTestCase {
     }
 
     private RootPersistentEntity getSampleModel() {
-        MetadataManager metaMgr = MetadataManager.getInstance(getTestConfig());
+        DataModelManager metaMgr = 
DataModelManager.getInstance(getTestConfig());
         DataModelDesc model = metaMgr.getDataModelDesc("ci_left_join_model");
         return model;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelDescTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelDescTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelDescTest.java
index f6d6fc3..7feefef 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelDescTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelDescTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,19 +44,19 @@ public class DataModelDescTest extends 
LocalFileMetadataTestCase {
 
     @Test
     public void loadInnerModel() {
-        DataModelDesc model = 
MetadataManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
+        DataModelDesc model = 
DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
         assertNotNull(model);
     }
 
     @Test
     public void loadLeftModel() {
-        DataModelDesc model = 
MetadataManager.getInstance(getTestConfig()).getDataModelDesc("ci_left_join_model");
+        DataModelDesc model = 
DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_left_join_model");
         assertNotNull(model);
     }
 
     @Test
     public void testNoDupColInDimAndMeasure() {
-        DataModelDesc model = 
MetadataManager.getInstance(getTestConfig()).getDataModelDesc("test_kylin_inner_join_model_desc");
+        DataModelDesc model = 
DataModelManager.getInstance(getTestConfig()).getDataModelDesc("test_kylin_inner_join_model_desc");
         String[] metrics = model.getMetrics();
         TblColRef col = model.findColumn("edw.test_cal_dt.cal_dt");
         assertTrue(metrics.length == 2);
@@ -66,7 +65,7 @@ public class DataModelDescTest extends 
LocalFileMetadataTestCase {
 
     @Test
     public void testGetCopyOf() throws JsonProcessingException {
-        DataModelDesc desc = 
MetadataManager.getInstance(getTestConfig()).getDataModelDesc("test_kylin_inner_join_model_desc");
+        DataModelDesc desc = 
DataModelManager.getInstance(getTestConfig()).getDataModelDesc("test_kylin_inner_join_model_desc");
         DataModelDesc copyDesc = DataModelDesc.getCopyOf(desc);
 
         // uuid is different, set to equals for json comparison
@@ -81,7 +80,7 @@ public class DataModelDescTest extends 
LocalFileMetadataTestCase {
 
     @Test
     public void testPartitionDescCopyOf() throws JsonProcessingException {
-        PartitionDesc desc = 
MetadataManager.getInstance(getTestConfig()).getDataModelDesc("test_kylin_inner_join_model_desc").partitionDesc;
+        PartitionDesc desc = 
DataModelManager.getInstance(getTestConfig()).getDataModelDesc("test_kylin_inner_join_model_desc").partitionDesc;
         PartitionDesc copyDesc = PartitionDesc.getCopyOf(desc);
 
         String descStr = JsonUtil.writeValueAsIndentString(desc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelManagerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelManagerTest.java
new file mode 100644
index 0000000..ee75302
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DataModelManagerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+import static org.apache.kylin.metadata.model.DataModelManager.getInstance;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class DataModelManagerTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testCiModel() {
+        DataModelManager mgr = getInstance(getTestConfig());
+        DataModelDesc lm = mgr.getDataModelDesc("ci_left_join_model");
+        DataModelDesc im = mgr.getDataModelDesc("ci_inner_join_model");
+        assertSnowflakeQuality(lm);
+        assertSnowflakeQuality(im);
+
+        // check inner/left models are identical apart from the left/inner 
difference
+        assertEquals(lm.getJoinTables().length, im.getJoinTables().length);
+        for (int i = 0, n = im.getJoinTables().length; i < n; i++) {
+            JoinTableDesc lt = lm.getJoinTables()[i];
+            JoinTableDesc it = im.getJoinTables()[i];
+            assertEquals(lt.getAlias(), it.getAlias());
+            assertEquals(lt.getKind(), it.getKind());
+            assertEquals(lt.getTable(), it.getTable());
+            assertArrayEquals(lt.getJoin().getForeignKey(), 
it.getJoin().getForeignKey());
+            assertArrayEquals(lt.getJoin().getPrimaryKey(), 
it.getJoin().getPrimaryKey());
+            assertTrue(lt.getJoin().isLeftJoin());
+            assertTrue(it.getJoin().isInnerJoin());
+        }
+
+        assertEquals(lm.getDimensions().size(), im.getDimensions().size());
+        for (int i = 0, n = im.getDimensions().size(); i < n; i++) {
+            ModelDimensionDesc ld = lm.getDimensions().get(i);
+            ModelDimensionDesc id = im.getDimensions().get(i);
+            assertEquals(ld.getTable(), id.getTable());
+            assertArrayEquals(ld.getColumns(), id.getColumns());
+        }
+
+        assertArrayEquals(lm.getMetrics(), im.getMetrics());
+    }
+
+    private void assertSnowflakeQuality(DataModelDesc model) {
+        Assert.assertNotNull(model);
+        try {
+            model.findTable("TEST_COUNTRY");
+            Assert.fail();
+        } catch (IllegalArgumentException ex) {
+            // excepted
+        }
+
+        Assert.assertNotNull(model.findTable("BUYER_COUNTRY"));
+        Assert.assertNotNull(model.findTable("SELLER_COUNTRY"));
+        Assert.assertNotNull(model.findColumn("BUYER_COUNTRY.NAME"));
+        Assert.assertNotNull(model.findColumn("BUYER_ID"));
+
+    }
+
+    @Test
+    public void testDataModel() throws Exception {
+        DataModelDesc modelDesc = 
getInstance(getTestConfig()).getDataModelDesc("test_kylin_left_join_model_desc");
+        Assert.assertTrue(modelDesc.getDimensions().size() > 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsTreeTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsTreeTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsTreeTest.java
index d5e9de7..85ddb13 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsTreeTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsTreeTest.java
@@ -25,7 +25,6 @@ import java.util.Map.Entry;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.JoinsTree.Chain;
 import org.junit.After;
 import org.junit.Before;
@@ -47,7 +46,7 @@ public class JoinsTreeTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testBasics() {
-        MetadataManager mgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        DataModelManager mgr = 
DataModelManager.getInstance(KylinConfig.getInstanceFromEnv());
         DataModelDesc model = mgr.getDataModelDesc("ci_left_join_model");
         JoinsTree joinsTree = model.getJoinsTree();
         
@@ -62,7 +61,7 @@ public class JoinsTreeTest extends LocalFileMetadataTestCase {
     
     @Test
     public void testMatch() {
-        MetadataManager mgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        DataModelManager mgr = 
DataModelManager.getInstance(KylinConfig.getInstanceFromEnv());
         DataModelDesc model = mgr.getDataModelDesc("ci_inner_join_model");
         JoinsTree joinsTree = model.getJoinsTree();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java 
b/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
index 016c591..8f1a044 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +59,7 @@ public class DataGenTest extends LocalFileMetadataTestCase {
     }
 
     private DataModelDesc getModel(String name) {
-        MetadataManager mgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        DataModelManager mgr = 
DataModelManager.getInstance(KylinConfig.getInstanceFromEnv());
         DataModelDesc model = mgr.getDataModelDesc(name);
         return model;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index ea7422b..2f5c29d 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -35,7 +35,7 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -266,7 +266,7 @@ public class CubeTupleConverter implements ITupleConverter {
     public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc 
join) {
         long ts = System.currentTimeMillis();
 
-        MetadataManager metaMgr = 
MetadataManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+        TableMetadataManager metaMgr = 
TableMetadataManager.getInstance(cubeSeg.getCubeInstance().getConfig());
         SnapshotManager snapshotMgr = 
SnapshotManager.getInstance(cubeSeg.getCubeInstance().getConfig());
 
         String tableName = join.getPKSide().getTableIdentity();

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 0084670..b2a2ea3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -30,7 +30,7 @@ import 
org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
-import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.SourceFactory;
@@ -53,7 +53,7 @@ public class MRUtil {
     }
 
     private static TableDesc getTableDesc(String tableName, String prj) {
-        return 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName,
 prj);
+        return 
TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName,
 prj);
     }
 
     public static IMRBatchCubingOutputSide 
getBatchCubingOutputSide(CubeSegment seg) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f45d8133/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
index a1815e2..4d92f8e 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -39,9 +39,9 @@ import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -90,7 +90,7 @@ public class MergeCuboidMapperTest extends 
LocalFileMetadataTestCase {
 
         logger.info("The metadataUrl is : " + getTestConfig());
 
-        MetadataManager.clearCache();
+        DataModelManager.clearCache();
         CubeManager.clearCache();
         ProjectManager.clearCache();
         DictionaryManager.clearCache();

Reply via email to