This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 8857397  #KYLIN-3977 Aviod mistaken deleting dicts by storage cleanup 
while building jobs are running
8857397 is described below

commit 8857397267204491eb83e722399951de7917d167
Author: PENG Zhengshuai <[email protected]>
AuthorDate: Wed May 15 21:39:00 2019 +0800

    #KYLIN-3977 Aviod mistaken deleting dicts by storage cleanup while building 
jobs are running
    
    * #KYLIN-3977, Aviod mistaken deleting dicts by storage cleanup while 
building jobs are running
---
 .../common/persistence/FileResourceStore.java      |  31 +++-
 .../common/persistence/HDFSResourceStore.java      |  31 +++-
 .../kylin/common/persistence/JDBCResourceSQL.java  |   6 +-
 .../common/persistence/JDBCResourceStore.java      |  48 +++++-
 .../common/persistence/PushdownResourceStore.java  |  15 ++
 .../kylin/common/persistence/ResourceStore.java    | 106 ++++++++++---
 .../common/persistence/ResourceStoreTest.java      | 120 +++++++++++++--
 .../java/org/apache/kylin/cube/CubeManager.java    |  52 ++++---
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     |  77 +++++++++-
 .../org/apache/kylin/dict/DictionaryManager.java   | 141 ++++++++++-------
 .../apache/kylin/dict/lookup/SnapshotManager.java  |  89 +++++++----
 .../apache/kylin/dict/DictionaryManagerTest.java   |  53 +++++--
 .../kylin/dict/lookup/SnapshotManagerTest.java     | 171 +++++++++++++++++++++
 .../kylin/engine/mr/steps/CreateDictionaryJob.java |  10 +-
 .../apache/kylin/cube/ITDictionaryManagerTest.java |  13 +-
 .../apache/kylin/rest/job/MetadataCleanupJob.java  | 136 +++++++++++-----
 .../kylin/rest/job/MetadataCleanupJobTest.java     |  16 +-
 .../kylin/storage/hbase/HBaseResourceStore.java    |  82 ++++++++--
 18 files changed, 959 insertions(+), 238 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index bccb7a3..813eca3 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -43,7 +43,7 @@ public class FileResourceStore extends ResourceStore {
     public FileResourceStore(KylinConfig kylinConfig) {
         super(kylinConfig);
         root = new File(getPath(kylinConfig)).getAbsoluteFile();
-        if (root.exists() == false)
+        if (!root.exists())
             throw new IllegalArgumentException(
                     "File not exist by '" + kylinConfig.getMetadataUrl() + "': 
" + root.getAbsolutePath());
     }
@@ -60,7 +60,7 @@ public class FileResourceStore extends ResourceStore {
 
     @Override
     protected void visitFolderImpl(String folderPath, boolean recursive, 
VisitFilter filter, boolean loadContent,
-                                   Visitor visitor) throws IOException {
+            Visitor visitor) throws IOException {
         if (--failVisitFolderCountDown == 0)
             throw new IOException("for test");
 
@@ -178,6 +178,19 @@ public class FileResourceStore extends ResourceStore {
     }
 
     @Override
+    protected void updateTimestampImpl(String resPath, long timestamp) throws 
IOException {
+        File f = file(resPath);
+        if (f.exists()) {
+            // note file timestamp may lose precision for last two digits of 
timestamp
+            boolean success = f.setLastModified(timestamp);
+            if (!success) {
+                throw new IOException(
+                        "Update resource timestamp failed, resPath:" + resPath 
+ ", timestamp: " + timestamp);
+            }
+        }
+    }
+
+    @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
 
         File f = file(resPath);
@@ -190,6 +203,20 @@ public class FileResourceStore extends ResourceStore {
     }
 
     @Override
+    protected void deleteResourceImpl(String resPath, long timestamp) throws 
IOException {
+        File f = file(resPath);
+        try {
+            if (f.exists()) {
+                long origLastModified = getResourceTimestampImpl(resPath);
+                if (checkTimeStampBeforeDelete(origLastModified, timestamp))
+                    FileUtils.forceDelete(f);
+            }
+        } catch (FileNotFoundException e) {
+            // FileNotFoundException is not a problem in case of delete
+        }
+    }
+
+    @Override
     protected String getReadableResourcePathImpl(String resPath) {
         return file(resPath).toString();
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
index c38a182..03cab1f 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
@@ -65,7 +65,7 @@ public class HDFSResourceStore extends ResourceStore {
 
         fs = HadoopUtil.getFileSystem(path);
         Path metadataPath = new Path(path);
-        if (fs.exists(metadataPath) == false) {
+        if (!fs.exists(metadataPath)) {
             logger.warn("Path not exist in HDFS, create it: {}. ", path);
             createMetaFolder(metadataPath);
         }
@@ -136,7 +136,7 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void visitFolderImpl(String folderPath, boolean recursive, 
VisitFilter filter, boolean loadContent,
-                                   Visitor visitor) throws IOException {
+            Visitor visitor) throws IOException {
         Path p = getRealHDFSPath(folderPath);
         if (!fs.exists(p) || !fs.isDirectory(p)) {
             return;
@@ -248,6 +248,18 @@ public class HDFSResourceStore extends ResourceStore {
     }
 
     @Override
+    protected void updateTimestampImpl(String resPath, long timestamp) throws 
IOException {
+        try {
+            Path p = getRealHDFSPath(resPath);
+            if (fs.exists(p)) {
+                fs.setTimes(p, timestamp, -1);
+            }
+        } catch (Exception e) {
+            throw new IOException("Update resource timestamp fail", e);
+        }
+    }
+
+    @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
         try {
             Path p = getRealHDFSPath(resPath);
@@ -258,6 +270,21 @@ public class HDFSResourceStore extends ResourceStore {
             throw new IOException("Delete resource fail", e);
         }
     }
+
+    @Override
+    protected void deleteResourceImpl(String resPath, long timestamp) throws 
IOException {
+        try {
+            Path p = getRealHDFSPath(resPath);
+            if (fs.exists(p)) {
+                long origLastModified = 
fs.getFileStatus(p).getModificationTime();
+                if (checkTimeStampBeforeDelete(origLastModified, timestamp))
+                    fs.delete(p, true);
+            }
+        } catch (Exception e) {
+            throw new IOException("Delete resource fail", e);
+        }
+    }
+
     @Override
     protected String getReadableResourcePathImpl(String resPath) {
         return getRealHDFSPath(resPath).toString();
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
index 3dc7b65..54233ea 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
@@ -30,7 +30,8 @@ public class JDBCResourceSQL {
     final private String metaTableTs;
     final private String metaTableContent;
 
-    public JDBCResourceSQL(String dialect, String tableName, String 
metaTableKey, String metaTableTs, String metaTableContent) {
+    public JDBCResourceSQL(String dialect, String tableName, String 
metaTableKey, String metaTableTs,
+            String metaTableContent) {
         this.format = 
JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(dialect);
         this.tableName = tableName;
         this.metaTableKey = metaTableKey;
@@ -96,8 +97,7 @@ public class JDBCResourceSQL {
         return sql;
     }
 
-    @SuppressWarnings("unused")
-    private String getReplaceSqlWithoutContent() {
+    public String getReplaceSqlWithoutContent() {
         final String sql = new 
MessageFormat(format.getReplaceSqlWithoutContent(), Locale.ROOT)
                 .format(new Object[] { tableName, metaTableTs, metaTableKey }, 
new StringBuffer(), new FieldPosition(0))
                 .toString();
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index 9e5a989..0bb8f4c 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -128,7 +128,7 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                     pstat = connection.prepareStatement(createIndexSql);
                     pstat.executeUpdate();
                 } catch (SQLException ex) {
-                    logger.error("Failed to create index on " + META_TABLE_TS, 
ex);
+                    logger.error("Failed to create index on {}", 
META_TABLE_TS, ex);
                 }
             }
 
@@ -171,7 +171,7 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
 
     @Override
     protected void visitFolderImpl(final String folderPath, final boolean 
recursive, final VisitFilter filter,
-                                   final boolean loadContent, final Visitor 
visitor) throws IOException {
+            final boolean loadContent, final Visitor visitor) throws 
IOException {
 
         try {
             executeSql(new SqlOperation() {
@@ -184,8 +184,8 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                         lookForPrefix = filter.pathPrefix;
                     }
 
-                    if (isRootPath(folderPath)){
-                        for (int i=0; i<tableNames.length; i++){
+                    if (isRootPath(folderPath)) {
+                        for (int i = 0; i < tableNames.length; i++) {
                             final String tableName = tableNames[i];
                             JDBCResourceSQL sqls = 
getJDBCResourceSQL(tableName);
                             String sql = 
sqls.getAllResourceSqlString(loadContent);
@@ -212,7 +212,7 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                                 }
                             }
                         }
-                    }else{
+                    } else {
                         JDBCResourceSQL sqls = 
getJDBCResourceSQL(getMetaTableName(folderPath));
                         String sql = sqls.getAllResourceSqlString(loadContent);
                         pstat = connection.prepareStatement(sql);
@@ -502,8 +502,7 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                                     pushdown.close();
                                 }
                             } else {
-                                pstat2.setBinaryStream(1,
-                                        new BufferedInputStream(new 
ByteArrayInputStream(content)));
+                                pstat2.setBinaryStream(1, new 
BufferedInputStream(new ByteArrayInputStream(content)));
                                 pstat2.setString(2, resPath);
                                 pstat2.executeUpdate();
                             }
@@ -517,6 +516,33 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
     }
 
     @Override
+    protected void updateTimestampImpl(final String resPath, final long 
timestamp) throws IOException {
+        try {
+            boolean skipHdfs = isJsonMetadata(resPath);
+            JDBCResourceSQL sqls = 
getJDBCResourceSQL(getMetaTableName(resPath));
+            executeSql(new SqlOperation() {
+                @Override
+                public void execute(Connection connection) throws SQLException 
{
+                    pstat = 
connection.prepareStatement(sqls.getReplaceSqlWithoutContent());
+                    pstat.setLong(1, timestamp);
+                    pstat.setString(2, resPath);
+                    pstat.executeUpdate();
+                }
+            });
+
+            if (!skipHdfs) {
+                try {
+                    updateTimestampPushdown(resPath, timestamp);
+                } catch (Throwable e) {
+                    throw new SQLException(e);
+                }
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
     protected void deleteResourceImpl(final String resPath) throws IOException 
{
         try {
             boolean skipHdfs = isJsonMetadata(resPath);
@@ -544,6 +570,14 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
     }
 
     @Override
+    protected void deleteResourceImpl(String resPath, long timestamp) throws 
IOException {
+        // considering deletePushDown operation, check timestamp at the 
beginning
+        long origLastModified = getResourceTimestampImpl(resPath);
+        if (checkTimeStampBeforeDelete(origLastModified, timestamp))
+            deleteResourceImpl(resPath);
+    }
+
+    @Override
     protected String getReadableResourcePathImpl(String resPath) {
         return metadataIdentifier + "(key='" + resPath + "')@" + 
kylinConfig.getMetadataUrl();
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
index cdf5eb4..7cb8ca6 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
@@ -202,4 +202,19 @@ abstract public class PushdownResourceStore extends 
ResourceStore {
             logger.debug("{} is not exists in the file system.", path);
         }
     }
+
+    protected void updateTimestampPushdown(String resPath, long timestamp) 
throws IOException {
+        updateTimestampPushdownFile(pushdownPath(resPath), timestamp);
+    }
+
+    private void updateTimestampPushdownFile(Path path, long timestamp) throws 
IOException {
+        FileSystem fileSystem = pushdownFS();
+
+        if (fileSystem.exists(path)) {
+            fileSystem.setTimes(path, timestamp, -1);
+            logger.debug("Update temp file timestamp success. Temp file: {} 
.", path);
+        } else {
+            logger.debug("{} is not exists in the file system.", path);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 275d95a..57a6ef0 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -35,7 +35,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -45,6 +44,7 @@ import org.apache.kylin.common.util.OptionsHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -85,7 +85,7 @@ abstract public class ResourceStore {
 
     private static ResourceStore createResourceStore(KylinConfig kylinConfig) {
         StorageURL metadataUrl = kylinConfig.getMetadataUrl();
-        logger.info("Using metadata url " + metadataUrl + " for resource 
store");
+        logger.info("Using metadata url {} for resource store", metadataUrl);
         String clsName = 
kylinConfig.getResourceStoreImpls().get(metadataUrl.getScheme());
         try {
             Class<? extends ResourceStore> cls = ClassUtil.forName(clsName, 
ResourceStore.class);
@@ -142,7 +142,8 @@ abstract public class ResourceStore {
     /**
      * Collect resources recursively under a folder, return empty list if 
folder does not exist
      */
-    final public List<String> collectResourceRecursively(final String 
folderPath, final String suffix) throws IOException {
+    final public List<String> collectResourceRecursively(final String 
folderPath, final String suffix)
+            throws IOException {
         return new ExponentialBackoffRetry(this).doWithRetry(new 
Callable<List<String>>() {
             @Override
             public List<String> call() throws Exception {
@@ -215,7 +216,7 @@ abstract public class ResourceStore {
      * NOTE: Exceptions thrown by ContentReader are swallowed in order to load 
every resource at best effort.
      */
     final public <T extends RootPersistentEntity> List<T> 
getAllResources(final String folderPath,
-                                                                          
final boolean recursive, final VisitFilter filter, final ContentReader<T> 
reader) throws IOException {
+            final boolean recursive, final VisitFilter filter, final 
ContentReader<T> reader) throws IOException {
 
         return new ExponentialBackoffRetry(this).doWithRetry(new 
Callable<List<T>>() {
             @Override
@@ -319,7 +320,7 @@ abstract public class ResourceStore {
      * @return bytes written
      */
     final public <T extends RootPersistentEntity> long putResource(String 
resPath, T obj, long ts,
-                                                                   
Serializer<T> serializer) throws IOException {
+            Serializer<T> serializer) throws IOException {
         resPath = norm(resPath);
         obj.setLastModified(ts);
         ContentWriter writer = ContentWriter.create(obj, serializer);
@@ -353,7 +354,7 @@ abstract public class ResourceStore {
     }
 
     private void putResourceCheckpoint(String resPath, ContentWriter content, 
long ts) throws IOException {
-        logger.trace("Directly saving resource " + resPath + " (Store " + 
kylinConfig.getMetadataUrl() + ")");
+        logger.trace("Directly saving resource {} (Store {})", resPath, 
kylinConfig.getMetadataUrl());
         beforeChange(resPath);
         putResourceWithRetry(resPath, content, ts);
     }
@@ -375,8 +376,8 @@ abstract public class ResourceStore {
     /**
      * check & set, overwrite a resource
      */
-    final public <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj, Serializer<T> serializer)
-            throws IOException, WriteConflictException {
+    final public <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj,
+            Serializer<T> serializer) throws IOException, 
WriteConflictException {
         checkAndPutResource(resPath, obj, System.currentTimeMillis(), 
serializer);
     }
 
@@ -384,9 +385,8 @@ abstract public class ResourceStore {
      * check & set, overwrite a resource
      */
     final public <T extends RootPersistentEntity> void 
checkAndPutResource(String resPath, T obj, long newTS,
-                                                                           
Serializer<T> serializer) throws IOException, WriteConflictException {
+            Serializer<T> serializer) throws IOException, 
WriteConflictException {
         resPath = norm(resPath);
-        //logger.debug("Saving resource " + resPath + " (Store " + 
kylinConfig.getMetadataUrl() + ")");
 
         long oldTS = obj.getLastModified();
         obj.setLastModified(newTS);
@@ -402,10 +402,7 @@ abstract public class ResourceStore {
             obj.setLastModified(confirmedTS); // update again the confirmed TS
             //return confirmedTS;
 
-        } catch (IOException e) {
-            obj.setLastModified(oldTS); // roll back TS when write fail
-            throw e;
-        } catch (RuntimeException e) {
+        } catch (IOException | RuntimeException e) {
             obj.setLastModified(oldTS); // roll back TS when write fail
             throw e;
         }
@@ -434,7 +431,7 @@ abstract public class ResourceStore {
             throws IOException, WriteConflictException;
 
     private long checkAndPutResourceWithRetry(final String resPath, final 
byte[] content, final long oldTS,
-                                              final long newTS) throws 
IOException, WriteConflictException {
+            final long newTS) throws IOException, WriteConflictException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
         return retry.doWithRetry(new Callable<Long>() {
             @Override
@@ -445,20 +442,60 @@ abstract public class ResourceStore {
     }
 
     /**
+     * update resource timestamp to timestamp
+     */
+    final public void updateTimestamp(String resPath, long timestamp) throws 
IOException {
+        logger.trace("Updating resource: {} with timestamp {} (Store {})", 
resPath, timestamp,
+                kylinConfig.getMetadataUrl());
+        updateTimestampCheckPoint(norm(resPath), timestamp);
+    }
+
+    private void updateTimestampCheckPoint(String resPath, long timestamp) 
throws IOException {
+        beforeChange(resPath);
+        updateTimestampWithRetry(resPath, timestamp);
+    }
+
+    private void updateTimestampWithRetry(final String resPath, final long 
timestamp) throws IOException {
+        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
+        retry.doWithRetry(new Callable() {
+            @Override
+            public Object call() throws IOException {
+                updateTimestampImpl(resPath, timestamp);
+                return null;
+            }
+        });
+    }
+
+    abstract protected void updateTimestampImpl(String resPath, long 
timestamp) throws IOException;
+
+    /**
      * delete a resource, does nothing on a folder
      */
     final public void deleteResource(String resPath) throws IOException {
-        logger.trace("Deleting resource " + resPath + " (Store " + 
kylinConfig.getMetadataUrl() + ")");
+        logger.trace("Deleting resource {} (Store {})", resPath, 
kylinConfig.getMetadataUrl());
         deleteResourceCheckpoint(norm(resPath));
     }
 
+    final public void deleteResource(String resPath, long timestamp) throws 
IOException {
+        logger.trace("Deleting resource {} within timestamp {} (Store {})", 
resPath, timestamp,
+                kylinConfig.getMetadataUrl());
+        deleteResourceCheckpoint(norm(resPath), timestamp);
+    }
+
     private void deleteResourceCheckpoint(String resPath) throws IOException {
         beforeChange(resPath);
         deleteResourceWithRetry(resPath);
     }
 
+    private void deleteResourceCheckpoint(String resPath, long timestamp) 
throws IOException {
+        beforeChange(resPath);
+        deleteResourceWithRetry(resPath, timestamp);
+    }
+
     abstract protected void deleteResourceImpl(String resPath) throws 
IOException;
 
+    abstract protected void deleteResourceImpl(String resPath, long timestamp) 
throws IOException;
+
     private void deleteResourceWithRetry(final String resPath) throws 
IOException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
         retry.doWithRetry(new Callable() {
@@ -470,6 +507,26 @@ abstract public class ResourceStore {
         });
     }
 
+    private void deleteResourceWithRetry(final String resPath, final long 
timestamp) throws IOException {
+        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
+        retry.doWithRetry(new Callable() {
+            @Override
+            public Object call() throws IOException {
+                deleteResourceImpl(resPath, timestamp);
+                return null;
+            }
+        });
+    }
+
+    protected boolean checkTimeStampBeforeDelete(long originLastModified, long 
timestamp) {
+        // note here is originLastModified may be 0
+        // 0 means resource doesn't exists in general, it's safe to pass the 
check
+        boolean passCheck = originLastModified <= timestamp;
+        logger.trace("check timestamp before delete: {}, [originLastModified: 
{}, timestamp: {}]", passCheck,
+                originLastModified, timestamp);
+        return passCheck;
+    }
+
     /**
      * called by ExponentialBackoffRetry, to check if an exception is due to 
unreachable server and worth retry
      */
@@ -514,7 +571,7 @@ abstract public class ResourceStore {
             resPath = resPath.substring(1);
         while (resPath.endsWith("/"))
             resPath = resPath.substring(0, resPath.length() - 1);
-        if (resPath.startsWith("/") == false)
+        if (!resPath.startsWith("/"))
             resPath = "/" + resPath;
         return resPath;
     }
@@ -570,7 +627,7 @@ abstract public class ResourceStore {
             checkThread();
 
             for (String resPath : origResData.keySet()) {
-                logger.debug("Rollbacking " + resPath);
+                logger.debug("Rollbacking {}", resPath);
                 try {
                     byte[] data = origResData.get(resPath);
                     Long ts = origResTimestamp.get(resPath);
@@ -663,7 +720,8 @@ abstract public class ResourceStore {
      * Visit all resource under a folder (optionally recursively), without 
loading the content of resource.
      * Low level API, DON'T support ExponentialBackoffRetry, caller should do 
necessary retry
      */
-    final public void visitFolder(String folderPath, boolean recursive, 
VisitFilter filter, Visitor visitor) throws IOException {
+    final public void visitFolder(String folderPath, boolean recursive, 
VisitFilter filter, Visitor visitor)
+            throws IOException {
         visitFolderInner(folderPath, recursive, filter, false, visitor);
     }
 
@@ -671,12 +729,14 @@ abstract public class ResourceStore {
      * Visit all resource and their content under a folder (optionally 
recursively).
      * Low level API, DON'T support ExponentialBackoffRetry, caller should do 
necessary retry
      */
-    final public void visitFolderAndContent(String folderPath, boolean 
recursive, VisitFilter filter, Visitor visitor) throws IOException {
+    final public void visitFolderAndContent(String folderPath, boolean 
recursive, VisitFilter filter, Visitor visitor)
+            throws IOException {
         visitFolderInner(folderPath, recursive, filter, true, visitor);
     }
 
     // Low level API, DON'T support ExponentialBackoffRetry, caller should do 
necessary retry
-    private void visitFolderInner(String folderPath, boolean recursive, 
VisitFilter filter, boolean loadContent, Visitor visitor) throws IOException {
+    private void visitFolderInner(String folderPath, boolean recursive, 
VisitFilter filter, boolean loadContent,
+            Visitor visitor) throws IOException {
         if (filter == null)
             filter = new VisitFilter();
 
@@ -700,7 +760,7 @@ abstract public class ResourceStore {
      * NOTE: Broken content exception should be wrapped by RawResource, and 
return to caller to decide how to handle.
      */
     abstract protected void visitFolderImpl(String folderPath, boolean 
recursive, VisitFilter filter,
-                                            boolean loadContent, Visitor 
visitor) throws IOException;
+            boolean loadContent, Visitor visitor) throws IOException;
 
     public static String dumpResources(KylinConfig kylinConfig, 
Collection<String> dumpList) throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
@@ -729,7 +789,7 @@ abstract public class ResourceStore {
             metaDirURI = "file://" + metaDirURI;
         else
             metaDirURI = "file:///" + metaDirURI;
-        logger.info("meta dir is: " + metaDirURI);
+        logger.info("meta dir is: {}", metaDirURI);
 
         return metaDirURI;
     }
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 828a935..8cd181a 100644
--- 
a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ 
b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -63,6 +63,7 @@ public class ResourceStoreTest {
     private static void testAStore(ResourceStore store) throws IOException {
         testBasics(store);
         testGetAllResources(store);
+        testUpdateResourceTimestamp(store);
     }
 
     private static void testPerformance(ResourceStore store) throws 
IOException {
@@ -113,13 +114,25 @@ public class ResourceStoreTest {
         String dir2 = "/table";
         String path2 = "/table/_test.json";
         StringEntity content2 = new StringEntity("something");
+        String dir3 = "/model_desc";
+        String path3 = "/model_desc/_test.json";
+        StringEntity content3 = new StringEntity("test check timestamp before 
delete");
 
         // cleanup legacy if any
         store.deleteResource(path1);
         store.deleteResource(path2);
+        store.deleteResource(path3);
 
         StringEntity t;
 
+        // get non-exist
+        assertNull(store.getResource(path1));
+        assertNull(store.getResource(path1, StringEntity.serializer));
+        assertNull(store.getResource(path2));
+        assertNull(store.getResource(path2, StringEntity.serializer));
+        assertNull(store.getResource(path3));
+        assertNull(store.getResource(path3, StringEntity.serializer));
+
         // put/get
         store.checkAndPutResource(path1, content1, StringEntity.serializer);
         assertTrue(store.exists(path1));
@@ -144,39 +157,86 @@ public class ResourceStoreTest {
             // expected
         }
 
+        // put path3
+        store.checkAndPutResource(path3, content3, StringEntity.serializer);
+        assertTrue(store.exists(path3));
+        t = store.getResource(path3, StringEntity.serializer);
+        assertEquals(content3, t);
+
         // list
-        NavigableSet<String> list = null;
+        NavigableSet<String> list;
 
         list = store.listResources(dir1);
-        System.out.println(list);
         assertTrue(list.contains(path1));
-        assertTrue(list.contains(path2) == false);
+        assertTrue(!list.contains(path2));
+        assertTrue(!list.contains(path3));
 
         list = store.listResources(dir2);
         assertTrue(list.contains(path2));
-        assertTrue(list.contains(path1) == false);
+        assertTrue(!list.contains(path1));
+        assertTrue(!list.contains(path3));
+
+        list = store.listResources(dir3);
+        assertTrue(list.contains(path3));
+        assertTrue(!list.contains(path1));
+        assertTrue(!list.contains(path2));
 
         list = store.listResources("/");
         assertTrue(list.contains(dir1));
         assertTrue(list.contains(dir2));
-        assertTrue(list.contains(path1) == false);
-        assertTrue(list.contains(path2) == false);
+        assertTrue(list.contains(dir3));
+        assertTrue(!list.contains(path1));
+        assertTrue(!list.contains(path2));
+        assertTrue(!list.contains(path3));
 
         list = store.listResources(path1);
         assertNull(list);
         list = store.listResources(path2);
         assertNull(list);
+        list = store.listResources(path3);
+        assertNull(list);
 
         // delete/exist
         store.deleteResource(path1);
-        assertTrue(store.exists(path1) == false);
+        assertTrue(!store.exists(path1));
         list = store.listResources(dir1);
-        assertTrue(list == null || list.contains(path1) == false);
+        assertTrue(list == null || !list.contains(path1));
 
         store.deleteResource(path2);
-        assertTrue(store.exists(path2) == false);
+        assertTrue(!store.exists(path2));
         list = store.listResources(dir2);
-        assertTrue(list == null || list.contains(path2) == false);
+        assertTrue(list == null || !list.contains(path2));
+
+        long origLastModified = store.getResourceTimestamp(path3);
+        long beforeLastModified = origLastModified - 100;
+
+        //  beforeLastModified < origLastModified  ==> not delete expected
+        store.deleteResource(path3, beforeLastModified);
+        assertTrue(store.exists(path3));
+        list = store.listResources(dir3);
+        assertTrue(list != null && list.contains(path3));
+
+        //  beforeLastModified = origLastModified  ==> delete expected
+        store.deleteResource(path3, origLastModified);
+        assertTrue(!store.exists(path3));
+        list = store.listResources(dir3);
+        assertTrue(list == null || !list.contains(path3));
+
+        // put again
+        content3 = new StringEntity("test check timestamp before delete new");
+        store.checkAndPutResource(path3, content3, StringEntity.serializer);
+        assertTrue(store.exists(path3));
+        t = store.getResource(path3, StringEntity.serializer);
+        assertEquals(content3, t);
+
+        origLastModified = store.getResourceTimestamp(path3);
+        long afterLastModified = origLastModified + 100;
+
+        // afterLastModified > origLastModified ==> delete expected
+        store.deleteResource(path3, afterLastModified);
+        assertTrue(!store.exists(path3));
+        list = store.listResources(dir3);
+        assertTrue(list == null || !list.contains(path3));
     }
 
     private static long testWritePerformance(ResourceStore store) throws 
IOException {
@@ -208,4 +268,44 @@ public class ResourceStoreTest {
         return oldUrl;
     }
 
+    private static void testUpdateResourceTimestamp(ResourceStore store) 
throws IOException {
+        String dir1 = "/cube";
+        String path1 = "/cube/_test.json";
+        StringEntity content1 = new StringEntity("test update timestamp");
+        // cleanup legacy if any
+        store.deleteResource(path1);
+
+        // get non-exist
+        assertNull(store.getResource(path1));
+        assertNull(store.getResource(path1, StringEntity.serializer));
+
+        // put/get
+        StringEntity t;
+        store.checkAndPutResource(path1, content1, StringEntity.serializer);
+        assertTrue(store.exists(path1));
+        t = store.getResource(path1, StringEntity.serializer);
+        assertEquals(content1, t);
+
+        long oldTS = store.getResourceTimestamp(path1);
+        long newTS = oldTS + 1000;
+        // update timestamp to newTS
+        store.updateTimestamp(path1, newTS);
+
+        long updatedTS = store.getResourceTimestamp(path1);
+        assertEquals(updatedTS, newTS);
+
+        newTS = 0L;
+        store.updateTimestamp(path1, newTS);
+        updatedTS = store.getResourceTimestamp(path1);
+        assertEquals(updatedTS, newTS);
+
+        // delete/exist
+        NavigableSet<String> list;
+        store.deleteResource(path1);
+        assertTrue(!store.exists(path1));
+        list = store.listResources(dir1);
+        assertTrue(list == null || !list.contains(path1));
+
+    }
+
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b2af656..bd9832f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -330,7 +330,8 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String 
lookupTableName, String newSnapshotResPath) throws IOException {
+    public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String 
lookupTableName, String newSnapshotResPath)
+            throws IOException {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             cube = cube.latestCopyForWrite();
 
@@ -419,7 +420,7 @@ public class CubeManager implements IRealizationProvider {
         }
 
         if (update.getUpdateTableSnapshotPath() != null) {
-            for(Map.Entry<String, String> lookupSnapshotPathEntry : 
update.getUpdateTableSnapshotPath().entrySet()) {
+            for (Map.Entry<String, String> lookupSnapshotPathEntry : 
update.getUpdateTableSnapshotPath().entrySet()) {
                 cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), 
lookupSnapshotPathEntry.getValue());
             }
         }
@@ -444,7 +445,8 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    private void processToRemoveSegments(CubeUpdate update, 
Segments<CubeSegment> newSegs, List<String> toRemoveResources) {
+    private void processToRemoveSegments(CubeUpdate update, 
Segments<CubeSegment> newSegs,
+            List<String> toRemoveResources) {
         Iterator<CubeSegment> iterator = newSegs.iterator();
         while (iterator.hasNext()) {
             CubeSegment currentSeg = iterator.next();
@@ -460,7 +462,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     // for test
-    CubeInstance reloadCube(String cubeName) {
+    public CubeInstance reloadCube(String cubeName) {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             return crud.reload(cubeName);
         }
@@ -522,7 +524,8 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc 
join, SnapshotTableDesc snapshotTableDesc) {
+    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc 
join,
+            SnapshotTableDesc snapshotTableDesc) {
         String tableName = join.getPKSide().getTableIdentity();
         String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, 
snapshotTableDesc);
         String[] pkCols = join.getPrimaryKey();
@@ -537,11 +540,12 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String 
tableName, SnapshotTableDesc snapshotTableDesc) {
+    private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String 
tableName,
+            SnapshotTableDesc snapshotTableDesc) {
         String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, 
snapshotTableDesc);
 
-        ExtTableSnapshotInfo extTableSnapshot = 
ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
-                snapshotResPath);
+        ExtTableSnapshotInfo extTableSnapshot = 
ExtTableSnapshotInfoManager.getInstance(config)
+                .getSnapshot(snapshotResPath);
         TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, 
cubeSegment.getProject());
         return LookupProviderFactory.getExtLookupTable(tableDesc, 
extTableSnapshot);
     }
@@ -874,8 +878,7 @@ public class CubeManager implements IRealizationProvider {
                 if (pair == null)
                     throw new IllegalArgumentException(
                             "Find no segments to merge by " + tsRange + " for 
cube " + cubeCopy);
-                segRange = new 
SegmentRange(pair.getFirst().getSegRange().start,
-                        pair.getSecond().getSegRange().end);
+                segRange = new 
SegmentRange(pair.getFirst().getSegRange().start, 
pair.getSecond().getSegRange().end);
             }
             return segRange;
         }
@@ -931,9 +934,8 @@ public class CubeManager implements IRealizationProvider {
                                 cubeCopy.toString(), newSegCopy.toString()));
 
             if (StringUtils.isBlank(newSegCopy.getLastBuildJobID()))
-                throw new IllegalStateException(
-                        String.format(Locale.ROOT, "For cube %s, segment %s 
missing LastBuildJobID",
-                                cubeCopy.toString(), newSegCopy.toString()));
+                throw new IllegalStateException(String.format(Locale.ROOT,
+                        "For cube %s, segment %s missing LastBuildJobID", 
cubeCopy.toString(), newSegCopy.toString()));
 
             if (isReady(newSegCopy) == true) {
                 logger.warn("For cube {}, segment {} state should be NEW but 
is READY", cubeCopy, newSegCopy);
@@ -985,9 +987,9 @@ public class CubeManager implements IRealizationProvider {
             CubeSegment[] optSegCopy = 
cubeCopy.regetSegments(optimizedSegments);
 
             if (cubeCopy.getSegments().size() != optSegCopy.length * 2) {
-                throw new IllegalStateException(
-                        String.format(Locale.ROOT, "For cube %s, every READY 
segment should be optimized and all segments should be READY before optimizing",
-                                cubeCopy.toString()));
+                throw new IllegalStateException(String.format(Locale.ROOT,
+                        "For cube %s, every READY segment should be optimized 
and all segments should be READY before optimizing",
+                        cubeCopy.toString()));
             }
 
             CubeSegment[] originalSegments = new 
CubeSegment[optSegCopy.length];
@@ -1001,15 +1003,14 @@ public class CubeManager implements 
IRealizationProvider {
                                     cubeCopy.toString(), seg.toString()));
 
                 if (StringUtils.isBlank(seg.getLastBuildJobID()))
-                    throw new IllegalStateException(
-                            String.format(Locale.ROOT, "For cube %s, segment 
%s missing LastBuildJobID",
-                                    cubeCopy.toString(), seg.toString()));
+                    throw new IllegalStateException(String.format(Locale.ROOT,
+                            "For cube %s, segment %s missing LastBuildJobID", 
cubeCopy.toString(), seg.toString()));
 
                 seg.setStatus(SegmentStatusEnum.READY);
             }
 
-            logger.info("Promoting cube {}, new segments {}, to remove 
segments {}",
-                         cubeCopy, Arrays.toString(optSegCopy), 
originalSegments);
+            logger.info("Promoting cube {}, new segments {}, to remove 
segments {}", cubeCopy,
+                    Arrays.toString(optSegCopy), originalSegments);
 
             CubeUpdate update = new CubeUpdate(cubeCopy);
             update.setToRemoveSegs(originalSegments) //
@@ -1026,9 +1027,9 @@ public class CubeManager implements IRealizationProvider {
             List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
             List<CubeSegment> newList = Arrays.asList(newSegments);
             if (tobe.containsAll(newList) == false) {
-                throw new IllegalStateException(
-                        String.format(Locale.ROOT, "For cube %s, the new 
segments %s do not fit in its current %s; the resulted tobe is %s",
-                                cube.toString(), newList.toString(), 
cube.getSegments().toString(), tobe.toString()));
+                throw new IllegalStateException(String.format(Locale.ROOT,
+                        "For cube %s, the new segments %s do not fit in its 
current %s; the resulted tobe is %s",
+                        cube.toString(), newList.toString(), 
cube.getSegments().toString(), tobe.toString()));
             }
         }
 
@@ -1171,7 +1172,8 @@ public class CubeManager implements IRealizationProvider {
             return (Dictionary<String>) info.getDictionaryObject();
         }
 
-        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String 
lookupTable, String uuid) throws IOException {
+        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String 
lookupTable, String uuid)
+                throws IOException {
             // work on copy instead of cached objects
             CubeInstance cubeCopy = 
cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
             CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 1bec02a..67b7bdb 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -20,18 +20,24 @@ package org.apache.kylin.cube.cli;
 
 import java.io.IOException;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
 import org.apache.kylin.dict.DictionaryProvider;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -43,7 +49,8 @@ import com.google.common.collect.Sets;
 
 public class DictionaryGeneratorCLI {
 
-    private DictionaryGeneratorCLI(){}
+    private DictionaryGeneratorCLI() {
+    }
 
     private static final Logger logger = 
LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
 
@@ -52,7 +59,26 @@ public class DictionaryGeneratorCLI {
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeSegment segment = cube.getSegmentById(segmentID);
 
-        processSegment(config, segment, uuid, factTableValueProvider, 
dictProvider);
+        int retryTime = 0;
+        while (retryTime < 3) {
+            if (retryTime > 0) {
+                logger.info("Rebuild dictionary and snapshot for Cube: {}, 
Segment: {}, {} times.", cubeName, segmentID,
+                        retryTime);
+            }
+
+            processSegment(config, segment, uuid, factTableValueProvider, 
dictProvider);
+
+            if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) {
+                break;
+            }
+            retryTime++;
+        }
+
+        if (retryTime >= 3) {
+            logger.error("Not all dictionaries and snapshots ready for cube 
segment: {}", segmentID);
+        } else {
+            logger.info("Succeed to build all dictionaries and snapshots for 
cube segment: {}", segmentID);
+        }
     }
 
     private static void processSegment(KylinConfig config, CubeSegment 
cubeSeg, String uuid,
@@ -113,4 +139,51 @@ public class DictionaryGeneratorCLI {
         }
     }
 
+    private static boolean isAllDictsAndSnapshotsReady(KylinConfig config, 
String cubeName, String segmentID) {
+        CubeInstance cube = 
CubeManager.getInstance(config).reloadCube(cubeName);
+        CubeSegment segment = cube.getSegmentById(segmentID);
+        ResourceStore store = ResourceStore.getStore(config);
+
+        // check dicts
+        logger.info("Begin to check if all dictionaries exist of Segment: {}", 
segmentID);
+        Map<String, String> dictionaries = segment.getDictionaries();
+        for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
+            String dictResPath = entry.getValue();
+            String dictKey = entry.getKey();
+            try {
+                DictionaryInfo dictInfo = store.getResource(dictResPath, 
DictionaryInfoSerializer.INFO_SERIALIZER);
+                if (dictInfo == null) {
+                    logger.warn("Dictionary=[key: {}, resource path: {}] 
doesn't exist in resource store", dictKey,
+                            dictResPath);
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.warn("Dictionary=[key: {}, path: {}] failed to check, 
details: {}", dictKey, dictResPath, e);
+                return false;
+            }
+        }
+
+        // check snapshots
+        logger.info("Begin to check if all snapshots exist of Segment: {}", 
segmentID);
+        Map<String, String> snapshots = segment.getSnapshots();
+        for (Map.Entry<String, String> entry : snapshots.entrySet()) {
+            String snapshotKey = entry.getKey();
+            String snapshotResPath = entry.getValue();
+            try {
+                SnapshotTable snapshot = store.getResource(snapshotResPath, 
SnapshotTableSerializer.INFO_SERIALIZER);
+                if (snapshot == null) {
+                    logger.info("SnapshotTable=[key: {}, resource path: {}] 
doesn't exist in resource store",
+                            snapshotKey, snapshotResPath);
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.warn("SnapshotTable=[key: {}, resource path: {}]  
failed to check, details: {}", snapshotKey,
+                        snapshotResPath, e);
+                return false;
+            }
+        }
+
+        logger.info("All dictionaries and snapshots exist checking succeed for 
Cube Segment: {}", segmentID);
+        return true;
+    }
 }
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 016700f..ffee105 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,12 +18,13 @@
 
 package org.apache.kylin.dict;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ClassUtil;
@@ -36,11 +37,12 @@ import 
org.apache.kylin.source.IReadableTable.TableSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
 
 public class DictionaryManager {
 
@@ -69,8 +71,8 @@ public class DictionaryManager {
                 .removalListener(new RemovalListener<String, DictionaryInfo>() 
{
                     @Override
                     public void onRemoval(RemovalNotification<String, 
DictionaryInfo> notification) {
-                        DictionaryManager.logger.info("Dict with resource path 
" + notification.getKey()
-                                + " is removed due to " + 
notification.getCause());
+                        DictionaryManager.logger.info("Dict with resource path 
{} is removed due to {}",
+                                notification.getKey(), 
notification.getCause());
                     }
                 })//
                 .maximumSize(config.getCachedDictMaxEntrySize())//
@@ -134,10 +136,11 @@ public class DictionaryManager {
                 largestDictInfo = 
getDictionaryInfo(largestDictInfo.getResourcePath());
                 Dictionary<String> largestDictObject = 
largestDictInfo.getDictionaryObject();
                 if (largestDictObject.contains(newDict)) {
-                    logger.info("dictionary content " + newDict + ", is 
contained by  dictionary at " + largestDictInfo.getResourcePath());
+                    logger.info("dictionary content {}, is contained by  
dictionary at {}", newDict,
+                            largestDictInfo.getResourcePath());
                     return largestDictInfo;
                 } else if (newDict.contains(largestDictObject)) {
-                    logger.info("dictionary content " + newDict + " is by far 
the largest, save it");
+                    logger.info("dictionary content {} is by far the largest, 
save it", newDict);
                     return saveNewDict(newDictInfo);
                 } else {
                     logger.info("merge dict and save...");
@@ -148,17 +151,18 @@ public class DictionaryManager {
                 return saveNewDict(newDictInfo);
             }
         } else {
-            String dupDict = checkDupByContent(newDictInfo, newDict);
+            DictionaryInfo dupDict = checkDupByContent(newDictInfo, newDict);
             if (dupDict != null) {
-                logger.info("Identical dictionary content, reuse existing 
dictionary at " + dupDict);
-                return getDictionaryInfo(dupDict);
+                logger.info("Identical dictionary content, reuse existing 
dictionary at {}", dupDict.getResourcePath());
+                dupDict = 
updateExistingDictLastModifiedTime(dupDict.getResourcePath());
+                return dupDict;
             }
 
             return saveNewDict(newDictInfo);
         }
     }
 
-    private String checkDupByContent(DictionaryInfo dictInfo, 
Dictionary<String> dict) throws IOException {
+    private DictionaryInfo checkDupByContent(DictionaryInfo dictInfo, 
Dictionary<String> dict) throws IOException {
         ResourceStore store = getStore();
         NavigableSet<String> existings = 
store.listResources(dictInfo.getResourceDir());
         if (existings == null)
@@ -170,19 +174,33 @@ public class DictionaryManager {
         }
 
         for (String existing : existings) {
-            DictionaryInfo existingInfo = getDictionaryInfo(existing);
-            if (existingInfo != null) {
-                if ((config.isDictResuable() && 
existingInfo.getDictionaryObject().contains(dict))
-                        || dict.equals(existingInfo.getDictionaryObject())) {
-                    return existing;
+            try {
+                if (existing.endsWith(".dict")) {
+                    DictionaryInfo existingInfo = getDictionaryInfo(existing);
+                    if (existingInfo != null && 
dict.equals(existingInfo.getDictionaryObject())) {
+                        return existingInfo;
+                    }
                 }
-                
+            } catch (Exception ex) {
+                logger.error("Tolerate exception checking dup dictionary " + 
existing, ex);
             }
         }
 
         return null;
     }
 
+    private DictionaryInfo updateExistingDictLastModifiedTime(String dictPath) 
throws IOException {
+        ResourceStore store = getStore();
+        if (StringUtils.isBlank(dictPath))
+            return NONE_INDICATOR;
+        long now = System.currentTimeMillis();
+        store.updateTimestamp(dictPath, now);
+        logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, 
now);
+        DictionaryInfo dictInfo = load(dictPath, true);
+        updateDictCache(dictInfo);
+        return dictInfo;
+    }
+
     private void initDictInfo(Dictionary<String> newDict, DictionaryInfo 
newDictInfo) {
         newDictInfo.setCardinality(newDict.getSize());
         newDictInfo.setDictionaryObject(newDict);
@@ -190,16 +208,14 @@ public class DictionaryManager {
     }
 
     private DictionaryInfo saveNewDict(DictionaryInfo newDictInfo) throws 
IOException {
-
         save(newDictInfo);
-        dictCache.put(newDictInfo.getResourcePath(), newDictInfo);
-
+        updateDictCache(newDictInfo);
         return newDictInfo;
     }
 
     public DictionaryInfo mergeDictionary(List<DictionaryInfo> dicts) throws 
IOException {
 
-        if (dicts.size() == 0)
+        if (dicts.isEmpty())
             return null;
 
         if (dicts.size() == 1)
@@ -209,7 +225,7 @@ public class DictionaryManager {
          * AppendTrieDictionary needn't merge
          * more than one AppendTrieDictionary will generate when user use 
{@link SegmentAppendTrieDictBuilder}
          */
-        for (DictionaryInfo dict: dicts) {
+        for (DictionaryInfo dict : dicts) {
             if 
(dict.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
                 return dict;
             }
@@ -224,7 +240,8 @@ public class DictionaryManager {
             } else {
                 if (!firstDictInfo.isDictOnSameColumn(info)) {
                     // don't throw exception, just output warning as legacy 
cube segment may build dict on PK
-                    logger.warn("Merging dictionaries are not structurally 
equal : " + firstDictInfo.getResourcePath() + " and " + info.getResourcePath());
+                    logger.warn("Merging dictionaries are not structurally 
equal : {} and {}",
+                            firstDictInfo.getResourcePath(), 
info.getResourcePath());
                 }
             }
             totalSize += info.getInput().getSize();
@@ -241,12 +258,6 @@ public class DictionaryManager {
         signature.setLastModifiedTime(System.currentTimeMillis());
         signature.setPath("merged_with_no_original_path");
 
-        //        String dupDict = checkDupByInfo(newDictInfo);
-        //        if (dupDict != null) {
-        //            logger.info("Identical dictionary input " + 
newDictInfo.getInput() + ", reuse existing dictionary at " + dupDict);
-        //            return getDictionaryInfo(dupDict);
-        //        }
-
         //check for cases where merging dicts are actually same
         boolean identicalSourceDicts = true;
         for (int i = 1; i < dicts.size(); ++i) {
@@ -260,7 +271,8 @@ public class DictionaryManager {
             logger.info("Use one of the merging dictionaries directly");
             return dicts.get(0);
         } else {
-            Dictionary<String> newDict = 
DictionaryGenerator.mergeDictionaries(DataType.getType(newDictInfo.getDataType()),
 dicts);
+            Dictionary<String> newDict = DictionaryGenerator
+                    
.mergeDictionaries(DataType.getType(newDictInfo.getDataType()), dicts);
             return trySaveNewDict(newDict, newDictInfo);
         }
     }
@@ -269,33 +281,38 @@ public class DictionaryManager {
         return buildDictionary(col, inpTable, null);
     }
 
-    public DictionaryInfo buildDictionary(TblColRef col, IReadableTable 
inpTable, String builderClass) throws IOException {
-        if (inpTable.exists() == false)
+    public DictionaryInfo buildDictionary(TblColRef col, IReadableTable 
inpTable, String builderClass)
+            throws IOException {
+        if (!inpTable.exists())
             return null;
 
-        logger.info("building dictionary for " + col);
+        logger.info("building dictionary for {}", col);
 
         DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable);
         String dupInfo = checkDupByInfo(dictInfo);
         if (dupInfo != null) {
-            logger.info("Identical dictionary input " + dictInfo.getInput() + 
", reuse existing dictionary at " + dupInfo);
-            return getDictionaryInfo(dupInfo);
+            logger.info("Identical dictionary input {}, reuse existing 
dictionary at {}", dictInfo.getInput(), dupInfo);
+            DictionaryInfo dupDictInfo = 
updateExistingDictLastModifiedTime(dupInfo);
+            return dupDictInfo;
         }
 
-        logger.info("Building dictionary object " + 
JsonUtil.writeValueAsString(dictInfo));
+        logger.info("Building dictionary object {}", 
JsonUtil.writeValueAsString(dictInfo));
 
         Dictionary<String> dictionary;
         dictionary = buildDictFromReadableTable(inpTable, dictInfo, 
builderClass, col);
         return trySaveNewDict(dictionary, dictInfo);
     }
 
-    private Dictionary<String> buildDictFromReadableTable(IReadableTable 
inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws 
IOException {
+    private Dictionary<String> buildDictFromReadableTable(IReadableTable 
inpTable, DictionaryInfo dictInfo,
+            String builderClass, TblColRef col) throws IOException {
         Dictionary<String> dictionary;
         IDictionaryValueEnumerator columnValueEnumerator = null;
         try {
-            columnValueEnumerator = new 
TableColumnValueEnumerator(inpTable.getReader(), 
dictInfo.getSourceColumnIndex());
+            columnValueEnumerator = new 
TableColumnValueEnumerator(inpTable.getReader(),
+                    dictInfo.getSourceColumnIndex());
             if (builderClass == null) {
-                dictionary = 
DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()), 
columnValueEnumerator);
+                dictionary = 
DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()),
+                        columnValueEnumerator);
             } else {
                 IDictionaryBuilder builder = (IDictionaryBuilder) 
ClassUtil.newInstance(builderClass);
                 dictionary = DictionaryGenerator.buildDictionary(builder, 
dictInfo, columnValueEnumerator);
@@ -309,12 +326,14 @@ public class DictionaryManager {
         return dictionary;
     }
 
-    public DictionaryInfo saveDictionary(TblColRef col, IReadableTable 
inpTable, Dictionary<String> dictionary) throws IOException {
+    public DictionaryInfo saveDictionary(TblColRef col, IReadableTable 
inpTable, Dictionary<String> dictionary)
+            throws IOException {
         DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable);
         String dupInfo = checkDupByInfo(dictInfo);
         if (dupInfo != null) {
-            logger.info("Identical dictionary input " + dictInfo.getInput() + 
", reuse existing dictionary at " + dupInfo);
-            return getDictionaryInfo(dupInfo);
+            logger.info("Identical dictionary input {}, reuse existing 
dictionary at {}", dictInfo.getInput(), dupInfo);
+            DictionaryInfo dupDictInfo = 
updateExistingDictLastModifiedTime(dupInfo);
+            return dupDictInfo;
         }
 
         return trySaveNewDict(dictionary, dictInfo);
@@ -331,7 +350,8 @@ public class DictionaryManager {
 
     private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
         final ResourceStore store = getStore();
-        final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(), 
DictionaryInfoSerializer.INFO_SERIALIZER);
+        final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(),
+                DictionaryInfoSerializer.INFO_SERIALIZER);
 
         TableSignature input = dictInfo.getInput();
 
@@ -345,7 +365,8 @@ public class DictionaryManager {
 
     private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws 
IOException {
         final ResourceStore store = getStore();
-        final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(), 
DictionaryInfoSerializer.INFO_SERIALIZER);
+        final List<DictionaryInfo> allResources = 
store.getAllResources(dictInfo.getResourceDir(),
+                DictionaryInfoSerializer.INFO_SERIALIZER);
 
         DictionaryInfo largestDict = null;
         for (DictionaryInfo dictionaryInfo : allResources) {
@@ -362,7 +383,7 @@ public class DictionaryManager {
     }
 
     public void removeDictionary(String resourcePath) throws IOException {
-        logger.info("Remvoing dict: " + resourcePath);
+        logger.info("Remvoing dict: {}", resourcePath);
         ResourceStore store = getStore();
         store.deleteResource(resourcePath);
         dictCache.invalidate(resourcePath);
@@ -385,7 +406,7 @@ public class DictionaryManager {
     void save(DictionaryInfo dict) throws IOException {
         ResourceStore store = getStore();
         String path = dict.getResourcePath();
-        logger.info("Saving dictionary at " + path);
+        logger.info("Saving dictionary at {}", path);
 
         store.putBigResource(path, dict, System.currentTimeMillis(), 
DictionaryInfoSerializer.FULL_SERIALIZER);
     }
@@ -393,11 +414,19 @@ public class DictionaryManager {
     DictionaryInfo load(String resourcePath, boolean loadDictObj) throws 
IOException {
         ResourceStore store = getStore();
 
-        logger.info("DictionaryManager(" + System.identityHashCode(this) + ") 
loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath);
-        DictionaryInfo info = store.getResource(resourcePath, loadDictObj ? 
DictionaryInfoSerializer.FULL_SERIALIZER : 
DictionaryInfoSerializer.INFO_SERIALIZER);
+        if (loadDictObj) {
+            logger.info("Loading dictionary at {}", resourcePath);
+        }
+
+        DictionaryInfo info = store.getResource(resourcePath,
+                loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : 
DictionaryInfoSerializer.INFO_SERIALIZER);
         return info;
     }
 
+    private void updateDictCache(DictionaryInfo newDictInfo) {
+        dictCache.put(newDictInfo.getResourcePath(), newDictInfo);
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getStore(config);
     }
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 8e63989..8f68fb0 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -24,7 +24,6 @@ import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.metadata.TableMetadataManager;
@@ -39,6 +38,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Lists;
 
 /**
  * @author yangli9
@@ -68,7 +70,8 @@ public class SnapshotManager {
         this.snapshotCache = CacheBuilder.newBuilder().removalListener(new 
RemovalListener<String, SnapshotTable>() {
             @Override
             public void onRemoval(RemovalNotification<String, SnapshotTable> 
notification) {
-                SnapshotManager.logger.info("Snapshot with resource path " + 
notification.getKey() + " is removed due to " + notification.getCause());
+                SnapshotManager.logger.info("Snapshot with resource path {} is 
removed due to {}",
+                        notification.getKey(), notification.getCause());
             }
         }).maximumSize(config.getCachedSnapshotMaxEntrySize())//
                 .expireAfterWrite(1, TimeUnit.DAYS).build(new 
CacheLoader<String, SnapshotTable>() {
@@ -88,8 +91,7 @@ public class SnapshotManager {
         try {
             SnapshotTable r = snapshotCache.get(resourcePath);
             if (r == null) {
-                r = load(resourcePath, true);
-                snapshotCache.put(resourcePath, r);
+                r = loadAndUpdateLocalCache(resourcePath);
             }
             return r;
         } catch (ExecutionException e) {
@@ -97,6 +99,12 @@ public class SnapshotManager {
         }
     }
 
+    private SnapshotTable loadAndUpdateLocalCache(String snapshotResPath) 
throws IOException {
+        SnapshotTable snapshotTable = load(snapshotResPath, true);
+        snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
+        return snapshotTable;
+    }
+
     public List<SnapshotTable> getSnapshots(String tableName, TableSignature 
sourceTableSignature) throws IOException {
         List<SnapshotTable> result = Lists.newArrayList();
         String tableSnapshotsPath = SnapshotTable.getResourceDir(tableName);
@@ -115,34 +123,34 @@ public class SnapshotManager {
         snapshotCache.invalidate(resourcePath);
     }
 
-    public SnapshotTable buildSnapshot(IReadableTable table, TableDesc 
tableDesc, KylinConfig cubeConfig) throws IOException {
+    public SnapshotTable buildSnapshot(IReadableTable table, TableDesc 
tableDesc, KylinConfig cubeConfig)
+            throws IOException {
         SnapshotTable snapshot = new SnapshotTable(table, 
tableDesc.getIdentity());
         snapshot.updateRandomUuid();
+        Interner<String> pool = Interners.newWeakInterner();
 
-        String dup = checkDupByInfo(snapshot);
-        if (dup != null) {
-            logger.info("Identical input " + table.getSignature() + ", reuse 
existing snapshot at " + dup);
-            return getSnapshotTable(dup);
-        }
+        synchronized (pool.intern(tableDesc.getIdentity())) {
+            SnapshotTable reusableSnapshot = getReusableSnapShot(table, 
snapshot, tableDesc, cubeConfig);
+            if (reusableSnapshot != null)
+                return 
updateDictLastModifiedTime(reusableSnapshot.getResourcePath());
 
-        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > 
cubeConfig.getTableSnapshotMaxMB()) {
-            throw new IllegalStateException("Table snapshot should be no 
greater than " + cubeConfig.getTableSnapshotMaxMB() //
-                    + " MB, but " + tableDesc + " size is " + 
snapshot.getSignature().getSize());
+            snapshot.takeSnapshot(table, tableDesc);
+            return trySaveNewSnapshot(snapshot);
         }
-
-        snapshot.takeSnapshot(table, tableDesc);
-
-        return trySaveNewSnapshot(snapshot);
     }
 
-    public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc 
tableDesc, String overwriteUUID) throws IOException {
+    public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc 
tableDesc, String overwriteUUID)
+            throws IOException {
         SnapshotTable snapshot = new SnapshotTable(table, 
tableDesc.getIdentity());
         snapshot.setUuid(overwriteUUID);
-
         snapshot.takeSnapshot(table, tableDesc);
 
-        SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
-        snapshot.setLastModified(existing.getLastModified());
+        try {
+            SnapshotTable existing = 
getSnapshotTable(snapshot.getResourcePath());
+            snapshot.setLastModified(existing.getLastModified());
+        } catch (Exception ex) {
+            logger.error("Error reading {}, delete it and save rebuild", 
snapshot.getResourcePath(), ex);
+        }
 
         save(snapshot);
         snapshotCache.put(snapshot.getResourcePath(), snapshot);
@@ -150,12 +158,30 @@ public class SnapshotManager {
         return snapshot;
     }
 
+    private SnapshotTable getReusableSnapShot(IReadableTable table, 
SnapshotTable snapshot, TableDesc tableDesc,
+            KylinConfig cubeConfig) throws IOException {
+        String dup = checkDupByInfo(snapshot);
+
+        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > 
cubeConfig.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException(
+                    "Table snapshot should be no greater than " + 
cubeConfig.getTableSnapshotMaxMB() //
+                            + " MB, but " + tableDesc + " size is " + 
snapshot.getSignature().getSize());
+        }
+
+        if (dup != null) {
+            logger.info("Identical input {}, reuse existing snapshot at {}", 
table.getSignature(), dup);
+            return getSnapshotTable(dup);
+        } else {
+            return null;
+        }
+    }
+
     public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) 
throws IOException {
 
         String dupTable = checkDupByContent(snapshotTable);
         if (dupTable != null) {
-            logger.info("Identical snapshot content " + snapshotTable + ", 
reuse existing snapshot at " + dupTable);
-            return getSnapshotTable(dupTable);
+            logger.info("Identical snapshot content {}, reuse existing 
snapshot at {}", snapshotTable, dupTable);
+            return updateDictLastModifiedTime(dupTable);
         }
 
         save(snapshotTable);
@@ -198,6 +224,16 @@ public class SnapshotManager {
         return null;
     }
 
+    private SnapshotTable updateDictLastModifiedTime(String snapshotPath) 
throws IOException {
+        ResourceStore store = getStore();
+        long now = System.currentTimeMillis();
+        store.updateTimestamp(snapshotPath, now);
+        logger.info("Update snapshotTable {} lastModifiedTime to {}", 
snapshotPath, now);
+
+        // update cache
+        return loadAndUpdateLocalCache(snapshotPath);
+    }
+
     private void save(SnapshotTable snapshot) throws IOException {
         ResourceStore store = getStore();
         String path = snapshot.getResourcePath();
@@ -205,13 +241,14 @@ public class SnapshotManager {
     }
 
     private SnapshotTable load(String resourcePath, boolean loadData) throws 
IOException {
-        logger.info("Loading snapshotTable from " + resourcePath + ", with 
loadData: " + loadData);
+        logger.info("Loading snapshotTable from {}, with loadData: {}", 
resourcePath, loadData);
         ResourceStore store = getStore();
 
-        SnapshotTable table = store.getResource(resourcePath, loadData ? 
SnapshotTableSerializer.FULL_SERIALIZER : 
SnapshotTableSerializer.INFO_SERIALIZER);
+        SnapshotTable table = store.getResource(resourcePath,
+                loadData ? SnapshotTableSerializer.FULL_SERIALIZER : 
SnapshotTableSerializer.INFO_SERIALIZER);
 
         if (loadData)
-            logger.debug("Loaded snapshot at " + resourcePath);
+            logger.debug("Loaded snapshot at {}", resourcePath);
 
         return table;
     }
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
index 6a86e33..bf97cc9 100755
--- 
a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.dict;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -45,9 +46,8 @@ public class DictionaryManagerTest extends 
LocalFileMetadataTestCase {
         cleanupTestMetadata();
     }
 
-    
     @Test
-    public void testBuildSaveDictionary() throws IOException {
+    public void testBuildSaveDictionary() throws IOException, 
InterruptedException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         DictionaryManager dictMgr = DictionaryManager.getInstance(config);
         DataModelManager metaMgr = DataModelManager.getInstance(config);
@@ -57,25 +57,48 @@ public class DictionaryManagerTest extends 
LocalFileMetadataTestCase {
         // non-exist input returns null;
         DictionaryInfo nullInfo = dictMgr.buildDictionary(col, 
MockupReadableTable.newNonExistTable("/a/path"));
         assertEquals(null, nullInfo);
-        
-        DictionaryInfo info1 = dictMgr.buildDictionary(col, 
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
+
+        DictionaryInfo info1 = dictMgr.buildDictionary(col,
+                MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", 
"3"));
         assertEquals(3, info1.getDictionaryObject().getSize());
 
+        long info1LastModified = info1.getLastModified();
+
         // same input returns same dict
-        DictionaryInfo info2 = dictMgr.buildDictionary(col, 
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
-        assertTrue(info1 == info2);
-        
+        // sleep 1 second to avoid file resource store timestamp precision 
lost when update
+        Thread.sleep(1000);
+        DictionaryInfo info2 = dictMgr.buildDictionary(col,
+                MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", 
"3"));
+        assertTrue(info1 != info2);
+        assertEquals(info1.getResourcePath(), info2.getResourcePath());
+
+        // update last modified when reused dict
+        long info2LastModified = info2.getLastModified();
+        assertTrue(info2LastModified > info1LastModified);
+
         // same input values (different path) returns same dict
-        DictionaryInfo info3 = dictMgr.buildDictionary(col, 
MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3"));
-        assertTrue(info1 == info3);
-        
+        // sleep 1 second to avoid file resource store timestamp precision 
lost when update
+        Thread.sleep(1000);
+        DictionaryInfo info3 = dictMgr.buildDictionary(col,
+                MockupReadableTable.newSingleColumnTable("/a/different/path", 
"1", "2", "3"));
+        assertTrue(info1 != info3);
+        assertTrue(info2 != info3);
+        assertEquals(info1.getResourcePath(), info3.getResourcePath());
+        assertEquals(info2.getResourcePath(), info3.getResourcePath());
+
+        // update last modified when reused dict
+        long info3LastModified = info3.getLastModified();
+        assertTrue(info3LastModified > info2LastModified);
+
         // save dictionary works in spite of non-exist table
-        Dictionary<String> dict = 
DictionaryGenerator.buildDictionary(col.getType(), new 
IterableDictionaryValueEnumerator("1", "2", "3"));
+        Dictionary<String> dict = 
DictionaryGenerator.buildDictionary(col.getType(),
+                new IterableDictionaryValueEnumerator("1", "2", "3"));
         DictionaryInfo info4 = dictMgr.saveDictionary(col, 
MockupReadableTable.newNonExistTable("/a/path"), dict);
-        assertTrue(info1 == info4);
-        
-        Dictionary<String> dict2 = 
DictionaryGenerator.buildDictionary(col.getType(), new 
IterableDictionaryValueEnumerator("1", "2", "3", "4"));
+        assertEquals(info1.getResourcePath(), info4.getResourcePath());
+
+        Dictionary<String> dict2 = 
DictionaryGenerator.buildDictionary(col.getType(),
+                new IterableDictionaryValueEnumerator("1", "2", "3", "4"));
         DictionaryInfo info5 = dictMgr.saveDictionary(col, 
MockupReadableTable.newNonExistTable("/a/path"), dict2);
-        assertTrue(info1 != info5);
+        assertNotEquals(info1.getResourcePath(), info5.getResourcePath());
     }
 }
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java
new file mode 100644
index 0000000..ab6ce13
--- /dev/null
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.MockupReadableTable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SnapshotManagerTest extends LocalFileMetadataTestCase {
+
+    private KylinConfig kylinConfig;
+    private SnapshotManager snapshotManager;
+    List<String[]> expect;
+    List<String[]> dif;
+    // test data for scd1
+    List<String[]> contentAtTime1;
+    List<String[]> contentAtTime2;
+    List<String[]> contentAtTime3;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        String[] s1 = new String[] { "1", "CN" };
+        String[] s2 = new String[] { "2", "NA" };
+        String[] s3 = new String[] { "3", "NA" };
+        String[] s4 = new String[] { "4", "KR" };
+        String[] s5 = new String[] { "5", "JP" };
+        String[] s6 = new String[] { "6", "CA" };
+        expect = Lists.newArrayList(s1, s2, s3, s4, s5);
+        dif = Lists.newArrayList(s1, s2, s3, s4, s6);
+
+        contentAtTime1 = Lists.newArrayList(s1, s2, s3, s4, s5, s6);
+        String[] s22 = new String[] { "2", "SP" };
+        contentAtTime2 = Lists.newArrayList(s1, s22, s3, s4, s5);
+        String[] s23 = new String[] { "2", "US" };
+        contentAtTime3 = Lists.newArrayList(s1, s23, s3, s4, s5);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    private TableDesc genTableDesc(String tableName) {
+        TableDesc table = TableDesc.mockup(tableName);
+        ColumnDesc desc1 = new ColumnDesc("1", "id", "string", null, null, 
null, null);
+        desc1.setId("1");
+        desc1.setDatatype("long");
+        ColumnDesc desc2 = new ColumnDesc("2", "country", "string", null, 
null, null, null);
+        desc2.setId("2");
+        desc2.setDatatype("string");
+        ColumnDesc[] columns = { desc1, desc2 };
+        table.setColumns(columns);
+        table.init(kylinConfig, "default");
+        return table;
+    }
+
+    private IReadableTable genTable(String path, List<String[]> content) {
+        IReadableTable.TableSignature signature = new 
IReadableTable.TableSignature(path, content.size(), 0);
+        return new MockupReadableTable(content, signature, true);
+    }
+
+    @Test
+    public void testCheckByContent() throws IOException, InterruptedException {
+        runTestCase();
+    }
+
+    public void runTestCase() throws IOException, InterruptedException {
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        snapshotManager = SnapshotManager.getInstance(kylinConfig);
+        SnapshotTable origin = 
snapshotManager.buildSnapshot(genTable("./origin", expect), 
genTableDesc("TEST_TABLE"),
+                kylinConfig);
+
+        // sleep 1 second to avoid file resource store precision lost
+        Thread.sleep(1000);
+        SnapshotTable dup = snapshotManager.buildSnapshot(genTable("./dup", 
expect), genTableDesc("TEST_TABLE"),
+                kylinConfig);
+        // assert same snapshot file
+        Assert.assertEquals(origin.getUuid(), dup.getUuid());
+        Assert.assertEquals(origin.getResourcePath(), dup.getResourcePath());
+
+        // assert the file has been updated
+        long originLastModified = origin.getLastModified();
+        long dupLastModified = dup.getLastModified();
+        Assert.assertTrue(dupLastModified > originLastModified);
+
+        SnapshotTable actual = 
snapshotManager.getSnapshotTable(origin.getResourcePath());
+        IReadableTable.TableReader reader = actual.getReader();
+        Assert.assertEquals(expect.size(), actual.getRowCount());
+        int i = 0;
+        while (reader.next()) {
+            Assert.assertEquals(stringJoin(expect.get(i++)), 
stringJoin(reader.getRow()));
+        }
+
+        SnapshotTable difTable = 
snapshotManager.buildSnapshot(genTable("./dif", dif), 
genTableDesc("TEST_TABLE"),
+                kylinConfig);
+        Assert.assertNotEquals(origin.getUuid(), difTable.getUuid());
+    }
+
+    @Test
+    public void testBuildSameSnapshotSameTime() throws InterruptedException, 
IOException {
+        final int threadCount = 3;
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
+        final TableDesc tableDesc = genTableDesc("TEST_TABLE");
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        snapshotManager = SnapshotManager.getInstance(kylinConfig);
+        ResourceStore store = ResourceStore.getStore(kylinConfig);
+
+        for (int i = 0; i < threadCount; ++i) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        snapshotManager.buildSnapshot(genTable("./origin", 
expect), tableDesc, kylinConfig);
+                    } catch (IOException e) {
+                        Assert.fail();
+                    } finally {
+                        countDownLatch.countDown();
+                    }
+                }
+            });
+        }
+        countDownLatch.await();
+        Assert.assertEquals(1, 
store.listResources("/table_snapshot/NULL.TEST_TABLE").size());
+    }
+
+    private String stringJoin(String[] strings) {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < strings.length; i++) {
+            builder.append(strings[i]);
+            if (i < strings.length - 1) {
+                builder.append(",");
+            }
+        }
+        return builder.toString();
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index aeb7b12..6ab4976 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -91,13 +91,15 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
                 }
                 FileSystem fs = HadoopUtil.getWorkingFileSystem();
 
-                Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, 
col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+                Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
+                        col.getName() + 
FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
                 if (dictFile == null) {
-                    logger.info("Dict for '" + col.getName() + "' not 
pre-built.");
+                    logger.info("Dict for '{}' not pre-built.", col.getName());
                     return null;
                 }
 
-                try (SequenceFile.Reader reader = new 
SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), 
SequenceFile.Reader.file(dictFile))) {
+                try (SequenceFile.Reader reader = new 
SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(),
+                        SequenceFile.Reader.file(dictFile))) {
                     NullWritable key = NullWritable.get();
                     ArrayPrimitiveWritable value = new 
ArrayPrimitiveWritable();
                     reader.next(key, value);
@@ -107,7 +109,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
                         String dictClassName = is.readUTF();
                         Dictionary<String> dict = (Dictionary<String>) 
ClassUtil.newInstance(dictClassName);
                         dict.readFields(is);
-                        logger.info("DictionaryProvider read dict from file: " 
+ dictFile);
+                        logger.info("DictionaryProvider read dict from file: 
{}", dictFile);
                         return dict;
                     }
                 }
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java 
b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
index dd84bd6..e432cdb 100755
--- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.cube;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -73,14 +73,17 @@ public class ITDictionaryManagerTest extends 
LocalFileMetadataTestCase {
         DictionaryInfo info1 = dictMgr.buildDictionary(col, 
mockupData.getDistinctValuesFor(col));
         System.out.println(JsonUtil.writeValueAsIndentString(info1));
 
+        Thread.sleep(1000);
+
         DictionaryInfo info2 = dictMgr.buildDictionary(col, 
mockupData.getDistinctValuesFor(col));
         System.out.println(JsonUtil.writeValueAsIndentString(info2));
 
         // test check duplicate
-        assertTrue(info1.getUuid() == info2.getUuid());
-        assertTrue(info1 == 
dictMgr.getDictionaryInfo(info1.getResourcePath()));
-        assertTrue(info2 == 
dictMgr.getDictionaryInfo(info2.getResourcePath()));
-        assertTrue(info1.getDictionaryObject() == info2.getDictionaryObject());
+        assertEquals(info1.getUuid(), info2.getUuid());
+        assertEquals(info1.getResourcePath(), info1.getResourcePath());
+        assertNotEquals(info1.getLastModified(), info2.getLastModified());
+        assertNotEquals(info1, info2);
+        assertEquals(info1.getDictionaryObject(), info2.getDictionaryObject());
 
         // verify dictionary entries
         @SuppressWarnings("unchecked")
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index e11fe74..360818b 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -20,13 +20,14 @@ package org.apache.kylin.rest.job;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,12 +40,14 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryInfoSerializer;
 import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class MetadataCleanupJob {
@@ -56,37 +59,39 @@ public class MetadataCleanupJob {
     // 
============================================================================
 
     final KylinConfig config;
-    
-    private List<String> garbageResources = Collections.emptyList();
-    
+
+    private Map<String, Long> garbageResources = Maps.newHashMap();
+    private ResourceStore store;
+
     public MetadataCleanupJob() {
         this(KylinConfig.getInstanceFromEnv());
     }
-    
+
     public MetadataCleanupJob(KylinConfig config) {
         this.config = config;
+        this.store = ResourceStore.getStore(config);
     }
-    
-    public List<String> getGarbageResources() {
+
+    public Map<String, Long> getGarbageResources() {
         return garbageResources;
     }
 
     // function entrance
-    public List<String> cleanup(boolean delete, int jobOutdatedDays) throws 
Exception {
+    public Map<String, Long> cleanup(boolean delete, int jobOutdatedDays) 
throws Exception {
         CubeManager cubeManager = CubeManager.getInstance(config);
-        ResourceStore store = ResourceStore.getStore(config);
         long newResourceTimeCut = System.currentTimeMillis() - 
NEW_RESOURCE_THREADSHOLD_MS;
         FileSystem fs = 
HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
 
-        List<String> toDeleteCandidates = Lists.newArrayList();
+        Map<String, Long> toDeleteCandidates = Maps.newHashMap();
 
         // two level resources, snapshot tables and cube statistics
         for (String resourceRoot : new String[] { 
ResourceStore.SNAPSHOT_RESOURCE_ROOT,
-                ResourceStore.CUBE_STATISTICS_ROOT, 
ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) {
+                ResourceStore.CUBE_STATISTICS_ROOT, 
ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT }) {
             for (String dir : noNull(store.listResources(resourceRoot))) {
                 for (String res : noNull(store.listResources(dir))) {
-                    if (store.getResourceTimestamp(res) < newResourceTimeCut)
-                        toDeleteCandidates.add(res);
+                    long timestamp = getTimestamp(res);
+                    if (timestamp < newResourceTimeCut)
+                        toDeleteCandidates.put(res, timestamp);
                 }
             }
         }
@@ -94,14 +99,18 @@ public class MetadataCleanupJob {
         // find all of the global dictionaries in HDFS
         try {
             FileStatus[] fStatus = new FileStatus[0];
-            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new 
Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"resources/GlobalDict/dict")));
-            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new 
Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"resources/SegmentDict/dict")));
+            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(
+                    KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() 
+ "resources/GlobalDict/dict")));
+            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(
+                    KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() 
+ "resources/SegmentDict/dict")));
             for (FileStatus status : fStatus) {
                 String path = status.getPath().toString();
                 FileStatus[] globalDicts = fs.listStatus(new Path(path));
                 for (FileStatus globalDict : globalDicts) {
                     String globalDictPath = globalDict.getPath().toString();
-                    toDeleteCandidates.add(globalDictPath);
+                    long timestamp = getTimestamp(globalDict);
+                    if (timestamp < newResourceTimeCut)
+                        toDeleteCandidates.put(globalDictPath, timestamp);
                 }
             }
         } catch (FileNotFoundException e) {
@@ -113,8 +122,9 @@ public class MetadataCleanupJob {
             for (String dir : noNull(store.listResources(resourceRoot))) {
                 for (String dir2 : noNull(store.listResources(dir))) {
                     for (String res : noNull(store.listResources(dir2))) {
-                        if (store.getResourceTimestamp(res) < 
newResourceTimeCut)
-                            toDeleteCandidates.add(res);
+                        long timestamp = getTimestamp(res);
+                        if (timestamp < newResourceTimeCut)
+                            toDeleteCandidates.put(res, timestamp);
                     }
                 }
             }
@@ -130,13 +140,16 @@ public class MetadataCleanupJob {
                 activeResources.add(segment.getStatisticsResourcePath());
                 for (String dictPath : segment.getDictionaryPaths()) {
                     DictionaryInfo dictInfo = store.getResource(dictPath, 
DictionaryInfoSerializer.FULL_SERIALIZER);
-                    if 
("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? 
dictInfo.getDictionaryClass() : null)){
+                    if ("org.apache.kylin.dict.AppendTrieDictionary"
+                            .equals(dictInfo != null ? 
dictInfo.getDictionaryClass() : null)) {
                         String dictObj = 
dictInfo.getDictionaryObject().toString();
                         String basedir = 
dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1);
-                        if 
(basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() 
+ "/resources/GlobalDict")) {
+                        if (basedir.startsWith(
+                                
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"/resources/GlobalDict")) {
                             
activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
                                     + "resources/GlobalDict" + 
dictInfo.getResourceDir());
-                        } else if 
(basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() 
+ "/resources/SegmentDict")) {
+                        } else if 
(basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+                                + "/resources/SegmentDict")) {
                             
activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
                                     + "resources/SegmentDict" + 
dictInfo.getResourceDir());
                         }
@@ -144,7 +157,7 @@ public class MetadataCleanupJob {
                 }
             }
         }
-        toDeleteCandidates.removeAll(activeResources);
+        toDeleteCandidates.keySet().removeAll(activeResources);
 
         // delete old and completed jobs
         long outdatedJobTimeCut = System.currentTimeMillis() - jobOutdatedDays 
* 24 * 3600 * 1000L;
@@ -152,49 +165,77 @@ public class MetadataCleanupJob {
         List<ExecutablePO> allExecutable = executableDao.getJobs();
         for (ExecutablePO executable : allExecutable) {
             long lastModified = executable.getLastModified();
-            String jobStatus = 
executableDao.getJobOutput(executable.getUuid()).getStatus();
-
-            if (lastModified < outdatedJobTimeCut && 
(ExecutableState.SUCCEED.toString().equals(jobStatus)
-                    || 
ExecutableState.DISCARDED.toString().equals(jobStatus))) {
-                toDeleteCandidates.add(ResourceStore.EXECUTE_RESOURCE_ROOT + 
"/" + executable.getUuid());
-                
toDeleteCandidates.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + 
executable.getUuid());
+            if (lastModified < outdatedJobTimeCut && 
isJobComplete(executableDao, executable)) {
+                String jobResPath = ResourceStore.EXECUTE_RESOURCE_ROOT + "/" 
+ executable.getUuid();
+                String jobOutputResPath = 
ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid();
+                long outputLastModified = getTimestamp(jobOutputResPath);
+                toDeleteCandidates.put(jobResPath, lastModified);
+                toDeleteCandidates.put(jobOutputResPath, outputLastModified);
 
-                for (ExecutablePO task : executable.getTasks()) {
-                    
toDeleteCandidates.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + 
task.getUuid());
+                List<ExecutablePO> tasks = executable.getTasks();
+                if (tasks != null && !tasks.isEmpty()) {
+                    for (ExecutablePO task : executable.getTasks()) {
+                        String taskId = task.getUuid();
+                        if (StringUtils.isNotBlank(taskId)) {
+                            String resPath = 
ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid();
+                            long timestamp = getTimestamp(resPath);
+                            toDeleteCandidates.put(resPath, timestamp);
+                        }
+                    }
                 }
             }
         }
-        
+
         garbageResources = cleanupConclude(delete, toDeleteCandidates);
         return garbageResources;
     }
 
-    private List<String> cleanupConclude(boolean delete, List<String> 
toDeleteResources) throws IOException {
+    private boolean isJobComplete(ExecutableDao executableDao, ExecutablePO 
job) {
+        String jobId = job.getUuid();
+        boolean isComplete = false;
+        try {
+            ExecutableOutputPO output = executableDao.getJobOutput(jobId);
+            String status = output.getStatus();
+            if (StringUtils.equals(status, ExecutableState.SUCCEED.toString())
+                    || StringUtils.equals(status, 
ExecutableState.DISCARDED.toString())) {
+                isComplete = true;
+            }
+        } catch (PersistentException e) {
+            logger.error("Get job output failed for job uuid: {}", jobId, e);
+            isComplete = true; // job output broken --> will be treat as 
complete
+        }
+
+        return isComplete;
+    }
+
+    private Map<String, Long> cleanupConclude(boolean delete, Map<String, 
Long> toDeleteResources) throws IOException {
         if (toDeleteResources.isEmpty()) {
             logger.info("No metadata resource to clean up");
             return toDeleteResources;
         }
-        
-        logger.info(toDeleteResources.size() + " metadata resource to clean 
up");
+
+        logger.info("{} metadata resource to clean up", 
toDeleteResources.size());
 
         if (delete) {
             ResourceStore store = ResourceStore.getStore(config);
             FileSystem fs = 
HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
-            for (String res : toDeleteResources) {
-                logger.info("Deleting metadata " + res);
+            for (String res : toDeleteResources.keySet()) {
+                long timestamp = toDeleteResources.get(res);
+                logger.info("Deleting metadata=[resource_path: {}, timestamp: 
{}]", res, timestamp);
                 try {
                     if 
(res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) {
                         fs.delete(new Path(res), true);
                     } else {
-                        store.deleteResource(res);
+                        store.deleteResource(res, timestamp);
                     }
                 } catch (IOException e) {
-                    logger.error("Failed to delete resource " + res, e);
+                    logger.error("Failed to delete metadata=[resource_path: 
{}, timestamp: {}] ", res, timestamp, e);
                 }
             }
         } else {
-            for (String res : toDeleteResources) {
-                logger.info("Dry run, pending delete metadata " + res);
+            for (String res : toDeleteResources.keySet()) {
+                long timestamp = toDeleteResources.get(res);
+                logger.info("Dry run, pending delete metadata=[resource_path: 
{}, timestamp: {}] ", res, timestamp);
             }
         }
         return toDeleteResources;
@@ -204,4 +245,17 @@ public class MetadataCleanupJob {
         return (list == null) ? new TreeSet<String>() : list;
     }
 
+    private long getTimestamp(String resPath) {
+        long timestamp = Long.MAX_VALUE;
+        try {
+            timestamp = store.getResourceTimestamp(resPath);
+        } catch (IOException e) {
+            logger.warn("Failed to get resource timestamp from remote resource 
store, details:{}", e);
+        }
+        return timestamp;
+    }
+
+    private long getTimestamp(FileStatus filestatus) {
+        return filestatus.getModificationTime();
+    }
 }
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
 
b/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
index fa85a1e..ad819e6 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
@@ -25,7 +25,7 @@ import static 
org.apache.kylin.common.util.LocalFileMetadataTestCase.staticCreat
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
@@ -42,18 +42,22 @@ public class MetadataCleanupJobTest {
 
     @Test
     public void testCleanUp() throws Exception {
-        staticCreateTestMetadata(false, new ResetTimeHook(1, 
"src/test/resources/test_meta"));
+        // file resource store may lose timestamp precision with millis 
second, set last modified as 2000
+        staticCreateTestMetadata(false, new ResetTimeHook(2000, 
"src/test/resources/test_meta"));
         MetadataCleanupJob metadataCleanupJob = new MetadataCleanupJob();
-        List<String> cleanupList = metadataCleanupJob.cleanup(false, 30);
-        Assert.assertEquals(7, cleanupList.size());
+        Map<String, Long> cleanupMap = metadataCleanupJob.cleanup(false, 30);
+        Assert.assertEquals(7, cleanupMap.size());
+        for (long timestamp : cleanupMap.values()) {
+            Assert.assertEquals(2000, timestamp);
+        }
     }
 
     @Test
     public void testNotCleanUp() throws Exception {
         staticCreateTestMetadata(false, new 
ResetTimeHook(System.currentTimeMillis(), "src/test/resources/test_meta"));
         MetadataCleanupJob metadataCleanupJob = new MetadataCleanupJob();
-        List<String> cleanupList = metadataCleanupJob.cleanup(false, 30);
-        Assert.assertEquals(0, cleanupList.size());
+        Map<String, Long> cleanupMap = metadataCleanupJob.cleanup(false, 30);
+        Assert.assertEquals(0, cleanupMap.size());
     }
 
     private class ResetTimeHook extends 
LocalFileMetadataTestCase.OverlayMetaHook {
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 14c5ea7..ecd698f 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -147,7 +147,7 @@ public class HBaseResourceStore extends 
PushdownResourceStore {
 
     @Override
     protected void visitFolderImpl(String folderPath, final boolean recursive, 
VisitFilter filter,
-                                   final boolean loadContent, final Visitor 
visitor) throws IOException {
+            final boolean loadContent, final Visitor visitor) throws 
IOException {
 
         visitFolder(folderPath, filter, loadContent, new FolderVisitor() {
             @Override
@@ -167,7 +167,8 @@ public class HBaseResourceStore extends 
PushdownResourceStore {
         });
     }
 
-    private void visitFolder(String folderPath, VisitFilter filter, boolean 
loadContent, FolderVisitor visitor) throws IOException {
+    private void visitFolder(String folderPath, VisitFilter filter, boolean 
loadContent, FolderVisitor visitor)
+            throws IOException {
         assert folderPath.startsWith("/");
 
         String folderPrefix = folderPath.endsWith("/") ? folderPath : 
folderPath + "/";
@@ -244,7 +245,7 @@ public class HBaseResourceStore extends 
PushdownResourceStore {
                     CompareFilter.CompareOp.LESS, 
Bytes.toBytes(visitFilter.lastModEndExclusive));
             filterList.addFilter(timeEndFilter);
         }
-        return filterList.getFilters().size() == 0 ? null : filterList;
+        return filterList.getFilters().isEmpty() ? null : filterList;
     }
 
     private InputStream getInputStream(String resPath, Result r) throws 
IOException {
@@ -352,18 +353,40 @@ public class HBaseResourceStore extends 
PushdownResourceStore {
     }
 
     @Override
-    protected void deleteResourceImpl(String resPath) throws IOException {
+    protected void updateTimestampImpl(String resPath, long timestamp) throws 
IOException {
         Table table = getConnection().getTable(TableName.valueOf(tableName));
         try {
-            boolean hdfsResourceExist = false;
-            Result result = internalGetFromHTable(table, resPath, true, false);
-            if (result != null) {
-                byte[] value = result.getValue(B_FAMILY, B_COLUMN);
-                if (value != null && value.length == 0) {
-                    hdfsResourceExist = true;
-                }
+            boolean hdfsResourceExist = isHdfsResourceExist(table, resPath);
+            long oldTS = getResourceLastModified(table, resPath);
+
+            byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
+            byte[] row = Bytes.toBytes(resPath);
+            Put put = new Put(row);
+            put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(timestamp));
+
+            boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, 
put);
+            logger.trace("Update row {} from oldTs: {}, to newTs: {}, 
operation result: {}", resPath, oldTS, timestamp,
+                    ok);
+            if (!ok) {
+                long real = getResourceTimestampImpl(resPath);
+                throw new WriteConflictException(
+                        "Overwriting conflict " + resPath + ", expect old TS " 
+ oldTS + ", but it is " + real);
             }
 
+            if (hdfsResourceExist) { // update timestamp in hdfs
+                updateTimestampPushdown(resPath, timestamp);
+            }
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    @Override
+    protected void deleteResourceImpl(String resPath) throws IOException {
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
+        try {
+            boolean hdfsResourceExist = isHdfsResourceExist(table, resPath);
+
             Delete del = new Delete(Bytes.toBytes(resPath));
             table.delete(del);
 
@@ -376,6 +399,43 @@ public class HBaseResourceStore extends 
PushdownResourceStore {
     }
 
     @Override
+    protected void deleteResourceImpl(String resPath, long timestamp) throws 
IOException {
+        Table table = getConnection().getTable(TableName.valueOf(tableName));
+        try {
+            boolean hdfsResourceExist = isHdfsResourceExist(table, resPath);
+            long origLastModified = getResourceLastModified(table, resPath);
+            if (checkTimeStampBeforeDelete(origLastModified, timestamp)) {
+                Delete del = new Delete(Bytes.toBytes(resPath));
+                table.delete(del);
+
+                if (hdfsResourceExist) { // remove hdfs cell value
+                    deletePushdown(resPath);
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+    }
+
+    // to avoid get Table twice time to improve delete performance
+    private long getResourceLastModified(Table table, String resPath) throws 
IOException {
+        return getTimestamp(internalGetFromHTable(table, resPath, false, 
true));
+    }
+
+    private boolean isHdfsResourceExist(Table table, String resPath) throws 
IOException {
+        boolean hdfsResourceExist = false;
+        Result result = internalGetFromHTable(table, resPath, true, false);
+        if (result != null) {
+            byte[] contentVal = result.getValue(B_FAMILY, B_COLUMN);
+            if (contentVal != null && contentVal.length == 0) {
+                hdfsResourceExist = true;
+            }
+        }
+
+        return hdfsResourceExist;
+    }
+
+    @Override
     protected String getReadableResourcePathImpl(String resPath) {
         return tableName + "(key='" + resPath + "')@" + 
kylinConfig.getMetadataUrl();
     }

Reply via email to