This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 8857397 #KYLIN-3977 Aviod mistaken deleting dicts by storage cleanup
while building jobs are running
8857397 is described below
commit 8857397267204491eb83e722399951de7917d167
Author: PENG Zhengshuai <[email protected]>
AuthorDate: Wed May 15 21:39:00 2019 +0800
#KYLIN-3977 Aviod mistaken deleting dicts by storage cleanup while building
jobs are running
* #KYLIN-3977, Aviod mistaken deleting dicts by storage cleanup while
building jobs are running
---
.../common/persistence/FileResourceStore.java | 31 +++-
.../common/persistence/HDFSResourceStore.java | 31 +++-
.../kylin/common/persistence/JDBCResourceSQL.java | 6 +-
.../common/persistence/JDBCResourceStore.java | 48 +++++-
.../common/persistence/PushdownResourceStore.java | 15 ++
.../kylin/common/persistence/ResourceStore.java | 106 ++++++++++---
.../common/persistence/ResourceStoreTest.java | 120 +++++++++++++--
.../java/org/apache/kylin/cube/CubeManager.java | 52 ++++---
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 77 +++++++++-
.../org/apache/kylin/dict/DictionaryManager.java | 141 ++++++++++-------
.../apache/kylin/dict/lookup/SnapshotManager.java | 89 +++++++----
.../apache/kylin/dict/DictionaryManagerTest.java | 53 +++++--
.../kylin/dict/lookup/SnapshotManagerTest.java | 171 +++++++++++++++++++++
.../kylin/engine/mr/steps/CreateDictionaryJob.java | 10 +-
.../apache/kylin/cube/ITDictionaryManagerTest.java | 13 +-
.../apache/kylin/rest/job/MetadataCleanupJob.java | 136 +++++++++++-----
.../kylin/rest/job/MetadataCleanupJobTest.java | 16 +-
.../kylin/storage/hbase/HBaseResourceStore.java | 82 ++++++++--
18 files changed, 959 insertions(+), 238 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
index bccb7a3..813eca3 100644
---
a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
+++
b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java
@@ -43,7 +43,7 @@ public class FileResourceStore extends ResourceStore {
public FileResourceStore(KylinConfig kylinConfig) {
super(kylinConfig);
root = new File(getPath(kylinConfig)).getAbsoluteFile();
- if (root.exists() == false)
+ if (!root.exists())
throw new IllegalArgumentException(
"File not exist by '" + kylinConfig.getMetadataUrl() + "':
" + root.getAbsolutePath());
}
@@ -60,7 +60,7 @@ public class FileResourceStore extends ResourceStore {
@Override
protected void visitFolderImpl(String folderPath, boolean recursive,
VisitFilter filter, boolean loadContent,
- Visitor visitor) throws IOException {
+ Visitor visitor) throws IOException {
if (--failVisitFolderCountDown == 0)
throw new IOException("for test");
@@ -178,6 +178,19 @@ public class FileResourceStore extends ResourceStore {
}
@Override
+ protected void updateTimestampImpl(String resPath, long timestamp) throws
IOException {
+ File f = file(resPath);
+ if (f.exists()) {
+ // note file timestamp may lose precision for last two digits of
timestamp
+ boolean success = f.setLastModified(timestamp);
+ if (!success) {
+ throw new IOException(
+ "Update resource timestamp failed, resPath:" + resPath
+ ", timestamp: " + timestamp);
+ }
+ }
+ }
+
+ @Override
protected void deleteResourceImpl(String resPath) throws IOException {
File f = file(resPath);
@@ -190,6 +203,20 @@ public class FileResourceStore extends ResourceStore {
}
@Override
+ protected void deleteResourceImpl(String resPath, long timestamp) throws
IOException {
+ File f = file(resPath);
+ try {
+ if (f.exists()) {
+ long origLastModified = getResourceTimestampImpl(resPath);
+ if (checkTimeStampBeforeDelete(origLastModified, timestamp))
+ FileUtils.forceDelete(f);
+ }
+ } catch (FileNotFoundException e) {
+ // FileNotFoundException is not a problem in case of delete
+ }
+ }
+
+ @Override
protected String getReadableResourcePathImpl(String resPath) {
return file(resPath).toString();
}
diff --git
a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
index c38a182..03cab1f 100644
---
a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
+++
b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
@@ -65,7 +65,7 @@ public class HDFSResourceStore extends ResourceStore {
fs = HadoopUtil.getFileSystem(path);
Path metadataPath = new Path(path);
- if (fs.exists(metadataPath) == false) {
+ if (!fs.exists(metadataPath)) {
logger.warn("Path not exist in HDFS, create it: {}. ", path);
createMetaFolder(metadataPath);
}
@@ -136,7 +136,7 @@ public class HDFSResourceStore extends ResourceStore {
@Override
protected void visitFolderImpl(String folderPath, boolean recursive,
VisitFilter filter, boolean loadContent,
- Visitor visitor) throws IOException {
+ Visitor visitor) throws IOException {
Path p = getRealHDFSPath(folderPath);
if (!fs.exists(p) || !fs.isDirectory(p)) {
return;
@@ -248,6 +248,18 @@ public class HDFSResourceStore extends ResourceStore {
}
@Override
+ protected void updateTimestampImpl(String resPath, long timestamp) throws
IOException {
+ try {
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p)) {
+ fs.setTimes(p, timestamp, -1);
+ }
+ } catch (Exception e) {
+ throw new IOException("Update resource timestamp fail", e);
+ }
+ }
+
+ @Override
protected void deleteResourceImpl(String resPath) throws IOException {
try {
Path p = getRealHDFSPath(resPath);
@@ -258,6 +270,21 @@ public class HDFSResourceStore extends ResourceStore {
throw new IOException("Delete resource fail", e);
}
}
+
+ @Override
+ protected void deleteResourceImpl(String resPath, long timestamp) throws
IOException {
+ try {
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p)) {
+ long origLastModified =
fs.getFileStatus(p).getModificationTime();
+ if (checkTimeStampBeforeDelete(origLastModified, timestamp))
+ fs.delete(p, true);
+ }
+ } catch (Exception e) {
+ throw new IOException("Delete resource fail", e);
+ }
+ }
+
@Override
protected String getReadableResourcePathImpl(String resPath) {
return getRealHDFSPath(resPath).toString();
diff --git
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
index 3dc7b65..54233ea 100644
---
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
+++
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java
@@ -30,7 +30,8 @@ public class JDBCResourceSQL {
final private String metaTableTs;
final private String metaTableContent;
- public JDBCResourceSQL(String dialect, String tableName, String
metaTableKey, String metaTableTs, String metaTableContent) {
+ public JDBCResourceSQL(String dialect, String tableName, String
metaTableKey, String metaTableTs,
+ String metaTableContent) {
this.format =
JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(dialect);
this.tableName = tableName;
this.metaTableKey = metaTableKey;
@@ -96,8 +97,7 @@ public class JDBCResourceSQL {
return sql;
}
- @SuppressWarnings("unused")
- private String getReplaceSqlWithoutContent() {
+ public String getReplaceSqlWithoutContent() {
final String sql = new
MessageFormat(format.getReplaceSqlWithoutContent(), Locale.ROOT)
.format(new Object[] { tableName, metaTableTs, metaTableKey },
new StringBuffer(), new FieldPosition(0))
.toString();
diff --git
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index 9e5a989..0bb8f4c 100644
---
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -128,7 +128,7 @@ public class JDBCResourceStore extends
PushdownResourceStore {
pstat = connection.prepareStatement(createIndexSql);
pstat.executeUpdate();
} catch (SQLException ex) {
- logger.error("Failed to create index on " + META_TABLE_TS,
ex);
+ logger.error("Failed to create index on {}",
META_TABLE_TS, ex);
}
}
@@ -171,7 +171,7 @@ public class JDBCResourceStore extends
PushdownResourceStore {
@Override
protected void visitFolderImpl(final String folderPath, final boolean
recursive, final VisitFilter filter,
- final boolean loadContent, final Visitor
visitor) throws IOException {
+ final boolean loadContent, final Visitor visitor) throws
IOException {
try {
executeSql(new SqlOperation() {
@@ -184,8 +184,8 @@ public class JDBCResourceStore extends
PushdownResourceStore {
lookForPrefix = filter.pathPrefix;
}
- if (isRootPath(folderPath)){
- for (int i=0; i<tableNames.length; i++){
+ if (isRootPath(folderPath)) {
+ for (int i = 0; i < tableNames.length; i++) {
final String tableName = tableNames[i];
JDBCResourceSQL sqls =
getJDBCResourceSQL(tableName);
String sql =
sqls.getAllResourceSqlString(loadContent);
@@ -212,7 +212,7 @@ public class JDBCResourceStore extends
PushdownResourceStore {
}
}
}
- }else{
+ } else {
JDBCResourceSQL sqls =
getJDBCResourceSQL(getMetaTableName(folderPath));
String sql = sqls.getAllResourceSqlString(loadContent);
pstat = connection.prepareStatement(sql);
@@ -502,8 +502,7 @@ public class JDBCResourceStore extends
PushdownResourceStore {
pushdown.close();
}
} else {
- pstat2.setBinaryStream(1,
- new BufferedInputStream(new
ByteArrayInputStream(content)));
+ pstat2.setBinaryStream(1, new
BufferedInputStream(new ByteArrayInputStream(content)));
pstat2.setString(2, resPath);
pstat2.executeUpdate();
}
@@ -517,6 +516,33 @@ public class JDBCResourceStore extends
PushdownResourceStore {
}
@Override
+ protected void updateTimestampImpl(final String resPath, final long
timestamp) throws IOException {
+ try {
+ boolean skipHdfs = isJsonMetadata(resPath);
+ JDBCResourceSQL sqls =
getJDBCResourceSQL(getMetaTableName(resPath));
+ executeSql(new SqlOperation() {
+ @Override
+ public void execute(Connection connection) throws SQLException
{
+ pstat =
connection.prepareStatement(sqls.getReplaceSqlWithoutContent());
+ pstat.setLong(1, timestamp);
+ pstat.setString(2, resPath);
+ pstat.executeUpdate();
+ }
+ });
+
+ if (!skipHdfs) {
+ try {
+ updateTimestampPushdown(resPath, timestamp);
+ } catch (Throwable e) {
+ throw new SQLException(e);
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
protected void deleteResourceImpl(final String resPath) throws IOException
{
try {
boolean skipHdfs = isJsonMetadata(resPath);
@@ -544,6 +570,14 @@ public class JDBCResourceStore extends
PushdownResourceStore {
}
@Override
+ protected void deleteResourceImpl(String resPath, long timestamp) throws
IOException {
+ // considering deletePushDown operation, check timestamp at the
beginning
+ long origLastModified = getResourceTimestampImpl(resPath);
+ if (checkTimeStampBeforeDelete(origLastModified, timestamp))
+ deleteResourceImpl(resPath);
+ }
+
+ @Override
protected String getReadableResourcePathImpl(String resPath) {
return metadataIdentifier + "(key='" + resPath + "')@" +
kylinConfig.getMetadataUrl();
}
diff --git
a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
index cdf5eb4..7cb8ca6 100644
---
a/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
+++
b/core-common/src/main/java/org/apache/kylin/common/persistence/PushdownResourceStore.java
@@ -202,4 +202,19 @@ abstract public class PushdownResourceStore extends
ResourceStore {
logger.debug("{} is not exists in the file system.", path);
}
}
+
+ protected void updateTimestampPushdown(String resPath, long timestamp)
throws IOException {
+ updateTimestampPushdownFile(pushdownPath(resPath), timestamp);
+ }
+
+ private void updateTimestampPushdownFile(Path path, long timestamp) throws
IOException {
+ FileSystem fileSystem = pushdownFS();
+
+ if (fileSystem.exists(path)) {
+ fileSystem.setTimes(path, timestamp, -1);
+ logger.debug("Update temp file timestamp success. Temp file: {}
.", path);
+ } else {
+ logger.debug("{} is not exists in the file system.", path);
+ }
+ }
}
\ No newline at end of file
diff --git
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 275d95a..57a6ef0 100644
---
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -35,7 +35,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
@@ -45,6 +44,7 @@ import org.apache.kylin.common.util.OptionsHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@@ -85,7 +85,7 @@ abstract public class ResourceStore {
private static ResourceStore createResourceStore(KylinConfig kylinConfig) {
StorageURL metadataUrl = kylinConfig.getMetadataUrl();
- logger.info("Using metadata url " + metadataUrl + " for resource
store");
+ logger.info("Using metadata url {} for resource store", metadataUrl);
String clsName =
kylinConfig.getResourceStoreImpls().get(metadataUrl.getScheme());
try {
Class<? extends ResourceStore> cls = ClassUtil.forName(clsName,
ResourceStore.class);
@@ -142,7 +142,8 @@ abstract public class ResourceStore {
/**
* Collect resources recursively under a folder, return empty list if
folder does not exist
*/
- final public List<String> collectResourceRecursively(final String
folderPath, final String suffix) throws IOException {
+ final public List<String> collectResourceRecursively(final String
folderPath, final String suffix)
+ throws IOException {
return new ExponentialBackoffRetry(this).doWithRetry(new
Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
@@ -215,7 +216,7 @@ abstract public class ResourceStore {
* NOTE: Exceptions thrown by ContentReader are swallowed in order to load
every resource at best effort.
*/
final public <T extends RootPersistentEntity> List<T>
getAllResources(final String folderPath,
-
final boolean recursive, final VisitFilter filter, final ContentReader<T>
reader) throws IOException {
+ final boolean recursive, final VisitFilter filter, final
ContentReader<T> reader) throws IOException {
return new ExponentialBackoffRetry(this).doWithRetry(new
Callable<List<T>>() {
@Override
@@ -319,7 +320,7 @@ abstract public class ResourceStore {
* @return bytes written
*/
final public <T extends RootPersistentEntity> long putResource(String
resPath, T obj, long ts,
-
Serializer<T> serializer) throws IOException {
+ Serializer<T> serializer) throws IOException {
resPath = norm(resPath);
obj.setLastModified(ts);
ContentWriter writer = ContentWriter.create(obj, serializer);
@@ -353,7 +354,7 @@ abstract public class ResourceStore {
}
private void putResourceCheckpoint(String resPath, ContentWriter content,
long ts) throws IOException {
- logger.trace("Directly saving resource " + resPath + " (Store " +
kylinConfig.getMetadataUrl() + ")");
+ logger.trace("Directly saving resource {} (Store {})", resPath,
kylinConfig.getMetadataUrl());
beforeChange(resPath);
putResourceWithRetry(resPath, content, ts);
}
@@ -375,8 +376,8 @@ abstract public class ResourceStore {
/**
* check & set, overwrite a resource
*/
- final public <T extends RootPersistentEntity> void
checkAndPutResource(String resPath, T obj, Serializer<T> serializer)
- throws IOException, WriteConflictException {
+ final public <T extends RootPersistentEntity> void
checkAndPutResource(String resPath, T obj,
+ Serializer<T> serializer) throws IOException,
WriteConflictException {
checkAndPutResource(resPath, obj, System.currentTimeMillis(),
serializer);
}
@@ -384,9 +385,8 @@ abstract public class ResourceStore {
* check & set, overwrite a resource
*/
final public <T extends RootPersistentEntity> void
checkAndPutResource(String resPath, T obj, long newTS,
-
Serializer<T> serializer) throws IOException, WriteConflictException {
+ Serializer<T> serializer) throws IOException,
WriteConflictException {
resPath = norm(resPath);
- //logger.debug("Saving resource " + resPath + " (Store " +
kylinConfig.getMetadataUrl() + ")");
long oldTS = obj.getLastModified();
obj.setLastModified(newTS);
@@ -402,10 +402,7 @@ abstract public class ResourceStore {
obj.setLastModified(confirmedTS); // update again the confirmed TS
//return confirmedTS;
- } catch (IOException e) {
- obj.setLastModified(oldTS); // roll back TS when write fail
- throw e;
- } catch (RuntimeException e) {
+ } catch (IOException | RuntimeException e) {
obj.setLastModified(oldTS); // roll back TS when write fail
throw e;
}
@@ -434,7 +431,7 @@ abstract public class ResourceStore {
throws IOException, WriteConflictException;
private long checkAndPutResourceWithRetry(final String resPath, final
byte[] content, final long oldTS,
- final long newTS) throws
IOException, WriteConflictException {
+ final long newTS) throws IOException, WriteConflictException {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
return retry.doWithRetry(new Callable<Long>() {
@Override
@@ -445,20 +442,60 @@ abstract public class ResourceStore {
}
/**
+ * update resource timestamp to timestamp
+ */
+ final public void updateTimestamp(String resPath, long timestamp) throws
IOException {
+ logger.trace("Updating resource: {} with timestamp {} (Store {})",
resPath, timestamp,
+ kylinConfig.getMetadataUrl());
+ updateTimestampCheckPoint(norm(resPath), timestamp);
+ }
+
+ private void updateTimestampCheckPoint(String resPath, long timestamp)
throws IOException {
+ beforeChange(resPath);
+ updateTimestampWithRetry(resPath, timestamp);
+ }
+
+ private void updateTimestampWithRetry(final String resPath, final long
timestamp) throws IOException {
+ ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
+ retry.doWithRetry(new Callable() {
+ @Override
+ public Object call() throws IOException {
+ updateTimestampImpl(resPath, timestamp);
+ return null;
+ }
+ });
+ }
+
+ abstract protected void updateTimestampImpl(String resPath, long
timestamp) throws IOException;
+
+ /**
* delete a resource, does nothing on a folder
*/
final public void deleteResource(String resPath) throws IOException {
- logger.trace("Deleting resource " + resPath + " (Store " +
kylinConfig.getMetadataUrl() + ")");
+ logger.trace("Deleting resource {} (Store {})", resPath,
kylinConfig.getMetadataUrl());
deleteResourceCheckpoint(norm(resPath));
}
+ final public void deleteResource(String resPath, long timestamp) throws
IOException {
+ logger.trace("Deleting resource {} within timestamp {} (Store {})",
resPath, timestamp,
+ kylinConfig.getMetadataUrl());
+ deleteResourceCheckpoint(norm(resPath), timestamp);
+ }
+
private void deleteResourceCheckpoint(String resPath) throws IOException {
beforeChange(resPath);
deleteResourceWithRetry(resPath);
}
+ private void deleteResourceCheckpoint(String resPath, long timestamp)
throws IOException {
+ beforeChange(resPath);
+ deleteResourceWithRetry(resPath, timestamp);
+ }
+
abstract protected void deleteResourceImpl(String resPath) throws
IOException;
+ abstract protected void deleteResourceImpl(String resPath, long timestamp)
throws IOException;
+
private void deleteResourceWithRetry(final String resPath) throws
IOException {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
retry.doWithRetry(new Callable() {
@@ -470,6 +507,26 @@ abstract public class ResourceStore {
});
}
+ private void deleteResourceWithRetry(final String resPath, final long
timestamp) throws IOException {
+ ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
+ retry.doWithRetry(new Callable() {
+ @Override
+ public Object call() throws IOException {
+ deleteResourceImpl(resPath, timestamp);
+ return null;
+ }
+ });
+ }
+
+ protected boolean checkTimeStampBeforeDelete(long originLastModified, long
timestamp) {
+ // note here is originLastModified may be 0
+ // 0 means resource doesn't exists in general, it's safe to pass the
check
+ boolean passCheck = originLastModified <= timestamp;
+ logger.trace("check timestamp before delete: {}, [originLastModified:
{}, timestamp: {}]", passCheck,
+ originLastModified, timestamp);
+ return passCheck;
+ }
+
/**
* called by ExponentialBackoffRetry, to check if an exception is due to
unreachable server and worth retry
*/
@@ -514,7 +571,7 @@ abstract public class ResourceStore {
resPath = resPath.substring(1);
while (resPath.endsWith("/"))
resPath = resPath.substring(0, resPath.length() - 1);
- if (resPath.startsWith("/") == false)
+ if (!resPath.startsWith("/"))
resPath = "/" + resPath;
return resPath;
}
@@ -570,7 +627,7 @@ abstract public class ResourceStore {
checkThread();
for (String resPath : origResData.keySet()) {
- logger.debug("Rollbacking " + resPath);
+ logger.debug("Rollbacking {}", resPath);
try {
byte[] data = origResData.get(resPath);
Long ts = origResTimestamp.get(resPath);
@@ -663,7 +720,8 @@ abstract public class ResourceStore {
* Visit all resource under a folder (optionally recursively), without
loading the content of resource.
* Low level API, DON'T support ExponentialBackoffRetry, caller should do
necessary retry
*/
- final public void visitFolder(String folderPath, boolean recursive,
VisitFilter filter, Visitor visitor) throws IOException {
+ final public void visitFolder(String folderPath, boolean recursive,
VisitFilter filter, Visitor visitor)
+ throws IOException {
visitFolderInner(folderPath, recursive, filter, false, visitor);
}
@@ -671,12 +729,14 @@ abstract public class ResourceStore {
* Visit all resource and their content under a folder (optionally
recursively).
* Low level API, DON'T support ExponentialBackoffRetry, caller should do
necessary retry
*/
- final public void visitFolderAndContent(String folderPath, boolean
recursive, VisitFilter filter, Visitor visitor) throws IOException {
+ final public void visitFolderAndContent(String folderPath, boolean
recursive, VisitFilter filter, Visitor visitor)
+ throws IOException {
visitFolderInner(folderPath, recursive, filter, true, visitor);
}
// Low level API, DON'T support ExponentialBackoffRetry, caller should do
necessary retry
- private void visitFolderInner(String folderPath, boolean recursive,
VisitFilter filter, boolean loadContent, Visitor visitor) throws IOException {
+ private void visitFolderInner(String folderPath, boolean recursive,
VisitFilter filter, boolean loadContent,
+ Visitor visitor) throws IOException {
if (filter == null)
filter = new VisitFilter();
@@ -700,7 +760,7 @@ abstract public class ResourceStore {
* NOTE: Broken content exception should be wrapped by RawResource, and
return to caller to decide how to handle.
*/
abstract protected void visitFolderImpl(String folderPath, boolean
recursive, VisitFilter filter,
- boolean loadContent, Visitor
visitor) throws IOException;
+ boolean loadContent, Visitor visitor) throws IOException;
public static String dumpResources(KylinConfig kylinConfig,
Collection<String> dumpList) throws IOException {
File tmp = File.createTempFile("kylin_job_meta", "");
@@ -729,7 +789,7 @@ abstract public class ResourceStore {
metaDirURI = "file://" + metaDirURI;
else
metaDirURI = "file:///" + metaDirURI;
- logger.info("meta dir is: " + metaDirURI);
+ logger.info("meta dir is: {}", metaDirURI);
return metaDirURI;
}
diff --git
a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 828a935..8cd181a 100644
---
a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++
b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -63,6 +63,7 @@ public class ResourceStoreTest {
private static void testAStore(ResourceStore store) throws IOException {
testBasics(store);
testGetAllResources(store);
+ testUpdateResourceTimestamp(store);
}
private static void testPerformance(ResourceStore store) throws
IOException {
@@ -113,13 +114,25 @@ public class ResourceStoreTest {
String dir2 = "/table";
String path2 = "/table/_test.json";
StringEntity content2 = new StringEntity("something");
+ String dir3 = "/model_desc";
+ String path3 = "/model_desc/_test.json";
+ StringEntity content3 = new StringEntity("test check timestamp before
delete");
// cleanup legacy if any
store.deleteResource(path1);
store.deleteResource(path2);
+ store.deleteResource(path3);
StringEntity t;
+ // get non-exist
+ assertNull(store.getResource(path1));
+ assertNull(store.getResource(path1, StringEntity.serializer));
+ assertNull(store.getResource(path2));
+ assertNull(store.getResource(path2, StringEntity.serializer));
+ assertNull(store.getResource(path3));
+ assertNull(store.getResource(path3, StringEntity.serializer));
+
// put/get
store.checkAndPutResource(path1, content1, StringEntity.serializer);
assertTrue(store.exists(path1));
@@ -144,39 +157,86 @@ public class ResourceStoreTest {
// expected
}
+ // put path3
+ store.checkAndPutResource(path3, content3, StringEntity.serializer);
+ assertTrue(store.exists(path3));
+ t = store.getResource(path3, StringEntity.serializer);
+ assertEquals(content3, t);
+
// list
- NavigableSet<String> list = null;
+ NavigableSet<String> list;
list = store.listResources(dir1);
- System.out.println(list);
assertTrue(list.contains(path1));
- assertTrue(list.contains(path2) == false);
+ assertTrue(!list.contains(path2));
+ assertTrue(!list.contains(path3));
list = store.listResources(dir2);
assertTrue(list.contains(path2));
- assertTrue(list.contains(path1) == false);
+ assertTrue(!list.contains(path1));
+ assertTrue(!list.contains(path3));
+
+ list = store.listResources(dir3);
+ assertTrue(list.contains(path3));
+ assertTrue(!list.contains(path1));
+ assertTrue(!list.contains(path2));
list = store.listResources("/");
assertTrue(list.contains(dir1));
assertTrue(list.contains(dir2));
- assertTrue(list.contains(path1) == false);
- assertTrue(list.contains(path2) == false);
+ assertTrue(list.contains(dir3));
+ assertTrue(!list.contains(path1));
+ assertTrue(!list.contains(path2));
+ assertTrue(!list.contains(path3));
list = store.listResources(path1);
assertNull(list);
list = store.listResources(path2);
assertNull(list);
+ list = store.listResources(path3);
+ assertNull(list);
// delete/exist
store.deleteResource(path1);
- assertTrue(store.exists(path1) == false);
+ assertTrue(!store.exists(path1));
list = store.listResources(dir1);
- assertTrue(list == null || list.contains(path1) == false);
+ assertTrue(list == null || !list.contains(path1));
store.deleteResource(path2);
- assertTrue(store.exists(path2) == false);
+ assertTrue(!store.exists(path2));
list = store.listResources(dir2);
- assertTrue(list == null || list.contains(path2) == false);
+ assertTrue(list == null || !list.contains(path2));
+
+ long origLastModified = store.getResourceTimestamp(path3);
+ long beforeLastModified = origLastModified - 100;
+
+ // beforeLastModified < origLastModified ==> not delete expected
+ store.deleteResource(path3, beforeLastModified);
+ assertTrue(store.exists(path3));
+ list = store.listResources(dir3);
+ assertTrue(list != null && list.contains(path3));
+
+ // beforeLastModified = origLastModified ==> delete expected
+ store.deleteResource(path3, origLastModified);
+ assertTrue(!store.exists(path3));
+ list = store.listResources(dir3);
+ assertTrue(list == null || !list.contains(path3));
+
+ // put again
+ content3 = new StringEntity("test check timestamp before delete new");
+ store.checkAndPutResource(path3, content3, StringEntity.serializer);
+ assertTrue(store.exists(path3));
+ t = store.getResource(path3, StringEntity.serializer);
+ assertEquals(content3, t);
+
+ origLastModified = store.getResourceTimestamp(path3);
+ long afterLastModified = origLastModified + 100;
+
+ // afterLastModified > origLastModified ==> delete expected
+ store.deleteResource(path3, afterLastModified);
+ assertTrue(!store.exists(path3));
+ list = store.listResources(dir3);
+ assertTrue(list == null || !list.contains(path3));
}
private static long testWritePerformance(ResourceStore store) throws
IOException {
@@ -208,4 +268,44 @@ public class ResourceStoreTest {
return oldUrl;
}
+ private static void testUpdateResourceTimestamp(ResourceStore store)
throws IOException {
+ String dir1 = "/cube";
+ String path1 = "/cube/_test.json";
+ StringEntity content1 = new StringEntity("test update timestamp");
+ // cleanup legacy if any
+ store.deleteResource(path1);
+
+ // get non-exist
+ assertNull(store.getResource(path1));
+ assertNull(store.getResource(path1, StringEntity.serializer));
+
+ // put/get
+ StringEntity t;
+ store.checkAndPutResource(path1, content1, StringEntity.serializer);
+ assertTrue(store.exists(path1));
+ t = store.getResource(path1, StringEntity.serializer);
+ assertEquals(content1, t);
+
+ long oldTS = store.getResourceTimestamp(path1);
+ long newTS = oldTS + 1000;
+ // update timestamp to newTS
+ store.updateTimestamp(path1, newTS);
+
+ long updatedTS = store.getResourceTimestamp(path1);
+ assertEquals(updatedTS, newTS);
+
+ newTS = 0L;
+ store.updateTimestamp(path1, newTS);
+ updatedTS = store.getResourceTimestamp(path1);
+ assertEquals(updatedTS, newTS);
+
+ // delete/exist
+ NavigableSet<String> list;
+ store.deleteResource(path1);
+ assertTrue(!store.exists(path1));
+ list = store.listResources(dir1);
+ assertTrue(list == null || !list.contains(path1));
+
+ }
+
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b2af656..bd9832f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -330,7 +330,8 @@ public class CubeManager implements IRealizationProvider {
}
}
- public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String
lookupTableName, String newSnapshotResPath) throws IOException {
+ public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String
lookupTableName, String newSnapshotResPath)
+ throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite();
@@ -419,7 +420,7 @@ public class CubeManager implements IRealizationProvider {
}
if (update.getUpdateTableSnapshotPath() != null) {
- for(Map.Entry<String, String> lookupSnapshotPathEntry :
update.getUpdateTableSnapshotPath().entrySet()) {
+ for (Map.Entry<String, String> lookupSnapshotPathEntry :
update.getUpdateTableSnapshotPath().entrySet()) {
cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(),
lookupSnapshotPathEntry.getValue());
}
}
@@ -444,7 +445,8 @@ public class CubeManager implements IRealizationProvider {
}
}
- private void processToRemoveSegments(CubeUpdate update,
Segments<CubeSegment> newSegs, List<String> toRemoveResources) {
+ private void processToRemoveSegments(CubeUpdate update,
Segments<CubeSegment> newSegs,
+ List<String> toRemoveResources) {
Iterator<CubeSegment> iterator = newSegs.iterator();
while (iterator.hasNext()) {
CubeSegment currentSeg = iterator.next();
@@ -460,7 +462,7 @@ public class CubeManager implements IRealizationProvider {
}
// for test
- CubeInstance reloadCube(String cubeName) {
+ public CubeInstance reloadCube(String cubeName) {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
return crud.reload(cubeName);
}
@@ -522,7 +524,8 @@ public class CubeManager implements IRealizationProvider {
}
}
- private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc
join, SnapshotTableDesc snapshotTableDesc) {
+ private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc
join,
+ SnapshotTableDesc snapshotTableDesc) {
String tableName = join.getPKSide().getTableIdentity();
String snapshotResPath = getSnapshotResPath(cubeSegment, tableName,
snapshotTableDesc);
String[] pkCols = join.getPrimaryKey();
@@ -537,11 +540,12 @@ public class CubeManager implements IRealizationProvider {
}
}
- private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String
tableName, SnapshotTableDesc snapshotTableDesc) {
+ private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String
tableName,
+ SnapshotTableDesc snapshotTableDesc) {
String snapshotResPath = getSnapshotResPath(cubeSegment, tableName,
snapshotTableDesc);
- ExtTableSnapshotInfo extTableSnapshot =
ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
- snapshotResPath);
+ ExtTableSnapshotInfo extTableSnapshot =
ExtTableSnapshotInfoManager.getInstance(config)
+ .getSnapshot(snapshotResPath);
TableDesc tableDesc = getMetadataManager().getTableDesc(tableName,
cubeSegment.getProject());
return LookupProviderFactory.getExtLookupTable(tableDesc,
extTableSnapshot);
}
@@ -874,8 +878,7 @@ public class CubeManager implements IRealizationProvider {
if (pair == null)
throw new IllegalArgumentException(
"Find no segments to merge by " + tsRange + " for
cube " + cubeCopy);
- segRange = new
SegmentRange(pair.getFirst().getSegRange().start,
- pair.getSecond().getSegRange().end);
+ segRange = new
SegmentRange(pair.getFirst().getSegRange().start,
pair.getSecond().getSegRange().end);
}
return segRange;
}
@@ -931,9 +934,8 @@ public class CubeManager implements IRealizationProvider {
cubeCopy.toString(), newSegCopy.toString()));
if (StringUtils.isBlank(newSegCopy.getLastBuildJobID()))
- throw new IllegalStateException(
- String.format(Locale.ROOT, "For cube %s, segment %s
missing LastBuildJobID",
- cubeCopy.toString(), newSegCopy.toString()));
+ throw new IllegalStateException(String.format(Locale.ROOT,
+ "For cube %s, segment %s missing LastBuildJobID",
cubeCopy.toString(), newSegCopy.toString()));
if (isReady(newSegCopy) == true) {
logger.warn("For cube {}, segment {} state should be NEW but
is READY", cubeCopy, newSegCopy);
@@ -985,9 +987,9 @@ public class CubeManager implements IRealizationProvider {
CubeSegment[] optSegCopy =
cubeCopy.regetSegments(optimizedSegments);
if (cubeCopy.getSegments().size() != optSegCopy.length * 2) {
- throw new IllegalStateException(
- String.format(Locale.ROOT, "For cube %s, every READY
segment should be optimized and all segments should be READY before optimizing",
- cubeCopy.toString()));
+ throw new IllegalStateException(String.format(Locale.ROOT,
+ "For cube %s, every READY segment should be optimized
and all segments should be READY before optimizing",
+ cubeCopy.toString()));
}
CubeSegment[] originalSegments = new
CubeSegment[optSegCopy.length];
@@ -1001,15 +1003,14 @@ public class CubeManager implements
IRealizationProvider {
cubeCopy.toString(), seg.toString()));
if (StringUtils.isBlank(seg.getLastBuildJobID()))
- throw new IllegalStateException(
- String.format(Locale.ROOT, "For cube %s, segment
%s missing LastBuildJobID",
- cubeCopy.toString(), seg.toString()));
+ throw new IllegalStateException(String.format(Locale.ROOT,
+ "For cube %s, segment %s missing LastBuildJobID",
cubeCopy.toString(), seg.toString()));
seg.setStatus(SegmentStatusEnum.READY);
}
- logger.info("Promoting cube {}, new segments {}, to remove
segments {}",
- cubeCopy, Arrays.toString(optSegCopy),
originalSegments);
+ logger.info("Promoting cube {}, new segments {}, to remove
segments {}", cubeCopy,
+ Arrays.toString(optSegCopy), originalSegments);
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToRemoveSegs(originalSegments) //
@@ -1026,9 +1027,9 @@ public class CubeManager implements IRealizationProvider {
List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
- throw new IllegalStateException(
- String.format(Locale.ROOT, "For cube %s, the new
segments %s do not fit in its current %s; the resulted tobe is %s",
- cube.toString(), newList.toString(),
cube.getSegments().toString(), tobe.toString()));
+ throw new IllegalStateException(String.format(Locale.ROOT,
+ "For cube %s, the new segments %s do not fit in its
current %s; the resulted tobe is %s",
+ cube.toString(), newList.toString(),
cube.getSegments().toString(), tobe.toString()));
}
}
@@ -1171,7 +1172,8 @@ public class CubeManager implements IRealizationProvider {
return (Dictionary<String>) info.getDictionaryObject();
}
- public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String
lookupTable, String uuid) throws IOException {
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String
lookupTable, String uuid)
+ throws IOException {
// work on copy instead of cached objects
CubeInstance cubeCopy =
cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 1bec02a..67b7bdb 100644
---
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -20,18 +20,24 @@ package org.apache.kylin.cube.cli;
import java.io.IOException;
import java.util.Locale;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
import org.apache.kylin.dict.DictionaryProvider;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
@@ -43,7 +49,8 @@ import com.google.common.collect.Sets;
public class DictionaryGeneratorCLI {
- private DictionaryGeneratorCLI(){}
+ private DictionaryGeneratorCLI() {
+ }
private static final Logger logger =
LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
@@ -52,7 +59,26 @@ public class DictionaryGeneratorCLI {
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
- processSegment(config, segment, uuid, factTableValueProvider,
dictProvider);
+ int retryTime = 0;
+ while (retryTime < 3) {
+ if (retryTime > 0) {
+ logger.info("Rebuild dictionary and snapshot for Cube: {},
Segment: {}, {} times.", cubeName, segmentID,
+ retryTime);
+ }
+
+ processSegment(config, segment, uuid, factTableValueProvider,
dictProvider);
+
+ if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) {
+ break;
+ }
+ retryTime++;
+ }
+
+ if (retryTime >= 3) {
+ logger.error("Not all dictionaries and snapshots ready for cube
segment: {}", segmentID);
+ } else {
+ logger.info("Succeed to build all dictionaries and snapshots for
cube segment: {}", segmentID);
+ }
}
private static void processSegment(KylinConfig config, CubeSegment
cubeSeg, String uuid,
@@ -113,4 +139,51 @@ public class DictionaryGeneratorCLI {
}
}
+ private static boolean isAllDictsAndSnapshotsReady(KylinConfig config,
String cubeName, String segmentID) {
+ CubeInstance cube =
CubeManager.getInstance(config).reloadCube(cubeName);
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ ResourceStore store = ResourceStore.getStore(config);
+
+ // check dicts
+ logger.info("Begin to check if all dictionaries exist of Segment: {}",
segmentID);
+ Map<String, String> dictionaries = segment.getDictionaries();
+ for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
+ String dictResPath = entry.getValue();
+ String dictKey = entry.getKey();
+ try {
+ DictionaryInfo dictInfo = store.getResource(dictResPath,
DictionaryInfoSerializer.INFO_SERIALIZER);
+ if (dictInfo == null) {
+ logger.warn("Dictionary=[key: {}, resource path: {}]
doesn't exist in resource store", dictKey,
+ dictResPath);
+ return false;
+ }
+ } catch (IOException e) {
+ logger.warn("Dictionary=[key: {}, path: {}] failed to check,
details: {}", dictKey, dictResPath, e);
+ return false;
+ }
+ }
+
+ // check snapshots
+ logger.info("Begin to check if all snapshots exist of Segment: {}",
segmentID);
+ Map<String, String> snapshots = segment.getSnapshots();
+ for (Map.Entry<String, String> entry : snapshots.entrySet()) {
+ String snapshotKey = entry.getKey();
+ String snapshotResPath = entry.getValue();
+ try {
+ SnapshotTable snapshot = store.getResource(snapshotResPath,
SnapshotTableSerializer.INFO_SERIALIZER);
+ if (snapshot == null) {
+ logger.info("SnapshotTable=[key: {}, resource path: {}]
doesn't exist in resource store",
+ snapshotKey, snapshotResPath);
+ return false;
+ }
+ } catch (IOException e) {
+ logger.warn("SnapshotTable=[key: {}, resource path: {}]
failed to check, details: {}", snapshotKey,
+ snapshotResPath, e);
+ return false;
+ }
+ }
+
+ logger.info("All dictionaries and snapshots exist checking succeed for
Cube Segment: {}", segmentID);
+ return true;
+ }
}
diff --git
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 016700f..ffee105 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,12 +18,13 @@
package org.apache.kylin.dict;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ClassUtil;
@@ -36,11 +37,12 @@ import
org.apache.kylin.source.IReadableTable.TableSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
public class DictionaryManager {
@@ -69,8 +71,8 @@ public class DictionaryManager {
.removalListener(new RemovalListener<String, DictionaryInfo>()
{
@Override
public void onRemoval(RemovalNotification<String,
DictionaryInfo> notification) {
- DictionaryManager.logger.info("Dict with resource path
" + notification.getKey()
- + " is removed due to " +
notification.getCause());
+ DictionaryManager.logger.info("Dict with resource path
{} is removed due to {}",
+ notification.getKey(),
notification.getCause());
}
})//
.maximumSize(config.getCachedDictMaxEntrySize())//
@@ -134,10 +136,11 @@ public class DictionaryManager {
largestDictInfo =
getDictionaryInfo(largestDictInfo.getResourcePath());
Dictionary<String> largestDictObject =
largestDictInfo.getDictionaryObject();
if (largestDictObject.contains(newDict)) {
- logger.info("dictionary content " + newDict + ", is
contained by dictionary at " + largestDictInfo.getResourcePath());
+ logger.info("dictionary content {}, is contained by
dictionary at {}", newDict,
+ largestDictInfo.getResourcePath());
return largestDictInfo;
} else if (newDict.contains(largestDictObject)) {
- logger.info("dictionary content " + newDict + " is by far
the largest, save it");
+ logger.info("dictionary content {} is by far the largest,
save it", newDict);
return saveNewDict(newDictInfo);
} else {
logger.info("merge dict and save...");
@@ -148,17 +151,18 @@ public class DictionaryManager {
return saveNewDict(newDictInfo);
}
} else {
- String dupDict = checkDupByContent(newDictInfo, newDict);
+ DictionaryInfo dupDict = checkDupByContent(newDictInfo, newDict);
if (dupDict != null) {
- logger.info("Identical dictionary content, reuse existing
dictionary at " + dupDict);
- return getDictionaryInfo(dupDict);
+ logger.info("Identical dictionary content, reuse existing
dictionary at {}", dupDict.getResourcePath());
+ dupDict =
updateExistingDictLastModifiedTime(dupDict.getResourcePath());
+ return dupDict;
}
return saveNewDict(newDictInfo);
}
}
- private String checkDupByContent(DictionaryInfo dictInfo,
Dictionary<String> dict) throws IOException {
+ private DictionaryInfo checkDupByContent(DictionaryInfo dictInfo,
Dictionary<String> dict) throws IOException {
ResourceStore store = getStore();
NavigableSet<String> existings =
store.listResources(dictInfo.getResourceDir());
if (existings == null)
@@ -170,19 +174,33 @@ public class DictionaryManager {
}
for (String existing : existings) {
- DictionaryInfo existingInfo = getDictionaryInfo(existing);
- if (existingInfo != null) {
- if ((config.isDictResuable() &&
existingInfo.getDictionaryObject().contains(dict))
- || dict.equals(existingInfo.getDictionaryObject())) {
- return existing;
+ try {
+ if (existing.endsWith(".dict")) {
+ DictionaryInfo existingInfo = getDictionaryInfo(existing);
+ if (existingInfo != null &&
dict.equals(existingInfo.getDictionaryObject())) {
+ return existingInfo;
+ }
}
-
+ } catch (Exception ex) {
+ logger.error("Tolerate exception checking dup dictionary " +
existing, ex);
}
}
return null;
}
+ private DictionaryInfo updateExistingDictLastModifiedTime(String dictPath)
throws IOException {
+ ResourceStore store = getStore();
+ if (StringUtils.isBlank(dictPath))
+ return NONE_INDICATOR;
+ long now = System.currentTimeMillis();
+ store.updateTimestamp(dictPath, now);
+ logger.info("Update dictionary {} lastModifiedTime to {}", dictPath,
now);
+ DictionaryInfo dictInfo = load(dictPath, true);
+ updateDictCache(dictInfo);
+ return dictInfo;
+ }
+
private void initDictInfo(Dictionary<String> newDict, DictionaryInfo
newDictInfo) {
newDictInfo.setCardinality(newDict.getSize());
newDictInfo.setDictionaryObject(newDict);
@@ -190,16 +208,14 @@ public class DictionaryManager {
}
private DictionaryInfo saveNewDict(DictionaryInfo newDictInfo) throws
IOException {
-
save(newDictInfo);
- dictCache.put(newDictInfo.getResourcePath(), newDictInfo);
-
+ updateDictCache(newDictInfo);
return newDictInfo;
}
public DictionaryInfo mergeDictionary(List<DictionaryInfo> dicts) throws
IOException {
- if (dicts.size() == 0)
+ if (dicts.isEmpty())
return null;
if (dicts.size() == 1)
@@ -209,7 +225,7 @@ public class DictionaryManager {
* AppendTrieDictionary needn't merge
* more than one AppendTrieDictionary will generate when user use
{@link SegmentAppendTrieDictBuilder}
*/
- for (DictionaryInfo dict: dicts) {
+ for (DictionaryInfo dict : dicts) {
if
(dict.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
return dict;
}
@@ -224,7 +240,8 @@ public class DictionaryManager {
} else {
if (!firstDictInfo.isDictOnSameColumn(info)) {
// don't throw exception, just output warning as legacy
cube segment may build dict on PK
- logger.warn("Merging dictionaries are not structurally
equal : " + firstDictInfo.getResourcePath() + " and " + info.getResourcePath());
+ logger.warn("Merging dictionaries are not structurally
equal : {} and {}",
+ firstDictInfo.getResourcePath(),
info.getResourcePath());
}
}
totalSize += info.getInput().getSize();
@@ -241,12 +258,6 @@ public class DictionaryManager {
signature.setLastModifiedTime(System.currentTimeMillis());
signature.setPath("merged_with_no_original_path");
- // String dupDict = checkDupByInfo(newDictInfo);
- // if (dupDict != null) {
- // logger.info("Identical dictionary input " +
newDictInfo.getInput() + ", reuse existing dictionary at " + dupDict);
- // return getDictionaryInfo(dupDict);
- // }
-
//check for cases where merging dicts are actually same
boolean identicalSourceDicts = true;
for (int i = 1; i < dicts.size(); ++i) {
@@ -260,7 +271,8 @@ public class DictionaryManager {
logger.info("Use one of the merging dictionaries directly");
return dicts.get(0);
} else {
- Dictionary<String> newDict =
DictionaryGenerator.mergeDictionaries(DataType.getType(newDictInfo.getDataType()),
dicts);
+ Dictionary<String> newDict = DictionaryGenerator
+
.mergeDictionaries(DataType.getType(newDictInfo.getDataType()), dicts);
return trySaveNewDict(newDict, newDictInfo);
}
}
@@ -269,33 +281,38 @@ public class DictionaryManager {
return buildDictionary(col, inpTable, null);
}
- public DictionaryInfo buildDictionary(TblColRef col, IReadableTable
inpTable, String builderClass) throws IOException {
- if (inpTable.exists() == false)
+ public DictionaryInfo buildDictionary(TblColRef col, IReadableTable
inpTable, String builderClass)
+ throws IOException {
+ if (!inpTable.exists())
return null;
- logger.info("building dictionary for " + col);
+ logger.info("building dictionary for {}", col);
DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable);
String dupInfo = checkDupByInfo(dictInfo);
if (dupInfo != null) {
- logger.info("Identical dictionary input " + dictInfo.getInput() +
", reuse existing dictionary at " + dupInfo);
- return getDictionaryInfo(dupInfo);
+ logger.info("Identical dictionary input {}, reuse existing
dictionary at {}", dictInfo.getInput(), dupInfo);
+ DictionaryInfo dupDictInfo =
updateExistingDictLastModifiedTime(dupInfo);
+ return dupDictInfo;
}
- logger.info("Building dictionary object " +
JsonUtil.writeValueAsString(dictInfo));
+ logger.info("Building dictionary object {}",
JsonUtil.writeValueAsString(dictInfo));
Dictionary<String> dictionary;
dictionary = buildDictFromReadableTable(inpTable, dictInfo,
builderClass, col);
return trySaveNewDict(dictionary, dictInfo);
}
- private Dictionary<String> buildDictFromReadableTable(IReadableTable
inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws
IOException {
+ private Dictionary<String> buildDictFromReadableTable(IReadableTable
inpTable, DictionaryInfo dictInfo,
+ String builderClass, TblColRef col) throws IOException {
Dictionary<String> dictionary;
IDictionaryValueEnumerator columnValueEnumerator = null;
try {
- columnValueEnumerator = new
TableColumnValueEnumerator(inpTable.getReader(),
dictInfo.getSourceColumnIndex());
+ columnValueEnumerator = new
TableColumnValueEnumerator(inpTable.getReader(),
+ dictInfo.getSourceColumnIndex());
if (builderClass == null) {
- dictionary =
DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()),
columnValueEnumerator);
+ dictionary =
DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()),
+ columnValueEnumerator);
} else {
IDictionaryBuilder builder = (IDictionaryBuilder)
ClassUtil.newInstance(builderClass);
dictionary = DictionaryGenerator.buildDictionary(builder,
dictInfo, columnValueEnumerator);
@@ -309,12 +326,14 @@ public class DictionaryManager {
return dictionary;
}
- public DictionaryInfo saveDictionary(TblColRef col, IReadableTable
inpTable, Dictionary<String> dictionary) throws IOException {
+ public DictionaryInfo saveDictionary(TblColRef col, IReadableTable
inpTable, Dictionary<String> dictionary)
+ throws IOException {
DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable);
String dupInfo = checkDupByInfo(dictInfo);
if (dupInfo != null) {
- logger.info("Identical dictionary input " + dictInfo.getInput() +
", reuse existing dictionary at " + dupInfo);
- return getDictionaryInfo(dupInfo);
+ logger.info("Identical dictionary input {}, reuse existing
dictionary at {}", dictInfo.getInput(), dupInfo);
+ DictionaryInfo dupDictInfo =
updateExistingDictLastModifiedTime(dupInfo);
+ return dupDictInfo;
}
return trySaveNewDict(dictionary, dictInfo);
@@ -331,7 +350,8 @@ public class DictionaryManager {
private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
final ResourceStore store = getStore();
- final List<DictionaryInfo> allResources =
store.getAllResources(dictInfo.getResourceDir(),
DictionaryInfoSerializer.INFO_SERIALIZER);
+ final List<DictionaryInfo> allResources =
store.getAllResources(dictInfo.getResourceDir(),
+ DictionaryInfoSerializer.INFO_SERIALIZER);
TableSignature input = dictInfo.getInput();
@@ -345,7 +365,8 @@ public class DictionaryManager {
private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws
IOException {
final ResourceStore store = getStore();
- final List<DictionaryInfo> allResources =
store.getAllResources(dictInfo.getResourceDir(),
DictionaryInfoSerializer.INFO_SERIALIZER);
+ final List<DictionaryInfo> allResources =
store.getAllResources(dictInfo.getResourceDir(),
+ DictionaryInfoSerializer.INFO_SERIALIZER);
DictionaryInfo largestDict = null;
for (DictionaryInfo dictionaryInfo : allResources) {
@@ -362,7 +383,7 @@ public class DictionaryManager {
}
public void removeDictionary(String resourcePath) throws IOException {
- logger.info("Remvoing dict: " + resourcePath);
+ logger.info("Remvoing dict: {}", resourcePath);
ResourceStore store = getStore();
store.deleteResource(resourcePath);
dictCache.invalidate(resourcePath);
@@ -385,7 +406,7 @@ public class DictionaryManager {
void save(DictionaryInfo dict) throws IOException {
ResourceStore store = getStore();
String path = dict.getResourcePath();
- logger.info("Saving dictionary at " + path);
+ logger.info("Saving dictionary at {}", path);
store.putBigResource(path, dict, System.currentTimeMillis(),
DictionaryInfoSerializer.FULL_SERIALIZER);
}
@@ -393,11 +414,19 @@ public class DictionaryManager {
DictionaryInfo load(String resourcePath, boolean loadDictObj) throws
IOException {
ResourceStore store = getStore();
- logger.info("DictionaryManager(" + System.identityHashCode(this) + ")
loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath);
- DictionaryInfo info = store.getResource(resourcePath, loadDictObj ?
DictionaryInfoSerializer.FULL_SERIALIZER :
DictionaryInfoSerializer.INFO_SERIALIZER);
+ if (loadDictObj) {
+ logger.info("Loading dictionary at {}", resourcePath);
+ }
+
+ DictionaryInfo info = store.getResource(resourcePath,
+ loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER :
DictionaryInfoSerializer.INFO_SERIALIZER);
return info;
}
+ private void updateDictCache(DictionaryInfo newDictInfo) {
+ dictCache.put(newDictInfo.getResourcePath(), newDictInfo);
+ }
+
private ResourceStore getStore() {
return ResourceStore.getStore(config);
}
diff --git
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 8e63989..8f68fb0 100644
---
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -24,7 +24,6 @@ import java.util.NavigableSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.metadata.TableMetadataManager;
@@ -39,6 +38,9 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Lists;
/**
* @author yangli9
@@ -68,7 +70,8 @@ public class SnapshotManager {
this.snapshotCache = CacheBuilder.newBuilder().removalListener(new
RemovalListener<String, SnapshotTable>() {
@Override
public void onRemoval(RemovalNotification<String, SnapshotTable>
notification) {
- SnapshotManager.logger.info("Snapshot with resource path " +
notification.getKey() + " is removed due to " + notification.getCause());
+ SnapshotManager.logger.info("Snapshot with resource path {} is
removed due to {}",
+ notification.getKey(), notification.getCause());
}
}).maximumSize(config.getCachedSnapshotMaxEntrySize())//
.expireAfterWrite(1, TimeUnit.DAYS).build(new
CacheLoader<String, SnapshotTable>() {
@@ -88,8 +91,7 @@ public class SnapshotManager {
try {
SnapshotTable r = snapshotCache.get(resourcePath);
if (r == null) {
- r = load(resourcePath, true);
- snapshotCache.put(resourcePath, r);
+ r = loadAndUpdateLocalCache(resourcePath);
}
return r;
} catch (ExecutionException e) {
@@ -97,6 +99,12 @@ public class SnapshotManager {
}
}
+ private SnapshotTable loadAndUpdateLocalCache(String snapshotResPath)
throws IOException {
+ SnapshotTable snapshotTable = load(snapshotResPath, true);
+ snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
+ return snapshotTable;
+ }
+
public List<SnapshotTable> getSnapshots(String tableName, TableSignature
sourceTableSignature) throws IOException {
List<SnapshotTable> result = Lists.newArrayList();
String tableSnapshotsPath = SnapshotTable.getResourceDir(tableName);
@@ -115,34 +123,34 @@ public class SnapshotManager {
snapshotCache.invalidate(resourcePath);
}
- public SnapshotTable buildSnapshot(IReadableTable table, TableDesc
tableDesc, KylinConfig cubeConfig) throws IOException {
+ public SnapshotTable buildSnapshot(IReadableTable table, TableDesc
tableDesc, KylinConfig cubeConfig)
+ throws IOException {
SnapshotTable snapshot = new SnapshotTable(table,
tableDesc.getIdentity());
snapshot.updateRandomUuid();
+ Interner<String> pool = Interners.newWeakInterner();
- String dup = checkDupByInfo(snapshot);
- if (dup != null) {
- logger.info("Identical input " + table.getSignature() + ", reuse
existing snapshot at " + dup);
- return getSnapshotTable(dup);
- }
+ synchronized (pool.intern(tableDesc.getIdentity())) {
+ SnapshotTable reusableSnapshot = getReusableSnapShot(table,
snapshot, tableDesc, cubeConfig);
+ if (reusableSnapshot != null)
+ return
updateDictLastModifiedTime(reusableSnapshot.getResourcePath());
- if ((float) snapshot.getSignature().getSize() / 1024 / 1024 >
cubeConfig.getTableSnapshotMaxMB()) {
- throw new IllegalStateException("Table snapshot should be no
greater than " + cubeConfig.getTableSnapshotMaxMB() //
- + " MB, but " + tableDesc + " size is " +
snapshot.getSignature().getSize());
+ snapshot.takeSnapshot(table, tableDesc);
+ return trySaveNewSnapshot(snapshot);
}
-
- snapshot.takeSnapshot(table, tableDesc);
-
- return trySaveNewSnapshot(snapshot);
}
- public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc
tableDesc, String overwriteUUID) throws IOException {
+ public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc
tableDesc, String overwriteUUID)
+ throws IOException {
SnapshotTable snapshot = new SnapshotTable(table,
tableDesc.getIdentity());
snapshot.setUuid(overwriteUUID);
-
snapshot.takeSnapshot(table, tableDesc);
- SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
- snapshot.setLastModified(existing.getLastModified());
+ try {
+ SnapshotTable existing =
getSnapshotTable(snapshot.getResourcePath());
+ snapshot.setLastModified(existing.getLastModified());
+ } catch (Exception ex) {
+ logger.error("Error reading {}, delete it and save rebuild",
snapshot.getResourcePath(), ex);
+ }
save(snapshot);
snapshotCache.put(snapshot.getResourcePath(), snapshot);
@@ -150,12 +158,30 @@ public class SnapshotManager {
return snapshot;
}
+ private SnapshotTable getReusableSnapShot(IReadableTable table,
SnapshotTable snapshot, TableDesc tableDesc,
+ KylinConfig cubeConfig) throws IOException {
+ String dup = checkDupByInfo(snapshot);
+
+ if ((float) snapshot.getSignature().getSize() / 1024 / 1024 >
cubeConfig.getTableSnapshotMaxMB()) {
+ throw new IllegalStateException(
+ "Table snapshot should be no greater than " +
cubeConfig.getTableSnapshotMaxMB() //
+ + " MB, but " + tableDesc + " size is " +
snapshot.getSignature().getSize());
+ }
+
+ if (dup != null) {
+ logger.info("Identical input {}, reuse existing snapshot at {}",
table.getSignature(), dup);
+ return getSnapshotTable(dup);
+ } else {
+ return null;
+ }
+ }
+
public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable)
throws IOException {
String dupTable = checkDupByContent(snapshotTable);
if (dupTable != null) {
- logger.info("Identical snapshot content " + snapshotTable + ",
reuse existing snapshot at " + dupTable);
- return getSnapshotTable(dupTable);
+ logger.info("Identical snapshot content {}, reuse existing
snapshot at {}", snapshotTable, dupTable);
+ return updateDictLastModifiedTime(dupTable);
}
save(snapshotTable);
@@ -198,6 +224,16 @@ public class SnapshotManager {
return null;
}
+ private SnapshotTable updateDictLastModifiedTime(String snapshotPath)
throws IOException {
+ ResourceStore store = getStore();
+ long now = System.currentTimeMillis();
+ store.updateTimestamp(snapshotPath, now);
+ logger.info("Update snapshotTable {} lastModifiedTime to {}",
snapshotPath, now);
+
+ // update cache
+ return loadAndUpdateLocalCache(snapshotPath);
+ }
+
private void save(SnapshotTable snapshot) throws IOException {
ResourceStore store = getStore();
String path = snapshot.getResourcePath();
@@ -205,13 +241,14 @@ public class SnapshotManager {
}
private SnapshotTable load(String resourcePath, boolean loadData) throws
IOException {
- logger.info("Loading snapshotTable from " + resourcePath + ", with
loadData: " + loadData);
+ logger.info("Loading snapshotTable from {}, with loadData: {}",
resourcePath, loadData);
ResourceStore store = getStore();
- SnapshotTable table = store.getResource(resourcePath, loadData ?
SnapshotTableSerializer.FULL_SERIALIZER :
SnapshotTableSerializer.INFO_SERIALIZER);
+ SnapshotTable table = store.getResource(resourcePath,
+ loadData ? SnapshotTableSerializer.FULL_SERIALIZER :
SnapshotTableSerializer.INFO_SERIALIZER);
if (loadData)
- logger.debug("Loaded snapshot at " + resourcePath);
+ logger.debug("Loaded snapshot at {}", resourcePath);
return table;
}
diff --git
a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
index 6a86e33..bf97cc9 100755
---
a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
+++
b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.dict;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -45,9 +46,8 @@ public class DictionaryManagerTest extends
LocalFileMetadataTestCase {
cleanupTestMetadata();
}
-
@Test
- public void testBuildSaveDictionary() throws IOException {
+ public void testBuildSaveDictionary() throws IOException,
InterruptedException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
DictionaryManager dictMgr = DictionaryManager.getInstance(config);
DataModelManager metaMgr = DataModelManager.getInstance(config);
@@ -57,25 +57,48 @@ public class DictionaryManagerTest extends
LocalFileMetadataTestCase {
// non-exist input returns null;
DictionaryInfo nullInfo = dictMgr.buildDictionary(col,
MockupReadableTable.newNonExistTable("/a/path"));
assertEquals(null, nullInfo);
-
- DictionaryInfo info1 = dictMgr.buildDictionary(col,
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
+
+ DictionaryInfo info1 = dictMgr.buildDictionary(col,
+ MockupReadableTable.newSingleColumnTable("/a/path", "1", "2",
"3"));
assertEquals(3, info1.getDictionaryObject().getSize());
+ long info1LastModified = info1.getLastModified();
+
// same input returns same dict
- DictionaryInfo info2 = dictMgr.buildDictionary(col,
MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3"));
- assertTrue(info1 == info2);
-
+ // sleep 1 second to avoid file resource store timestamp precision
lost when update
+ Thread.sleep(1000);
+ DictionaryInfo info2 = dictMgr.buildDictionary(col,
+ MockupReadableTable.newSingleColumnTable("/a/path", "1", "2",
"3"));
+ assertTrue(info1 != info2);
+ assertEquals(info1.getResourcePath(), info2.getResourcePath());
+
+ // update last modified when reused dict
+ long info2LastModified = info2.getLastModified();
+ assertTrue(info2LastModified > info1LastModified);
+
// same input values (different path) returns same dict
- DictionaryInfo info3 = dictMgr.buildDictionary(col,
MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3"));
- assertTrue(info1 == info3);
-
+ // sleep 1 second to avoid file resource store timestamp precision
lost when update
+ Thread.sleep(1000);
+ DictionaryInfo info3 = dictMgr.buildDictionary(col,
+ MockupReadableTable.newSingleColumnTable("/a/different/path",
"1", "2", "3"));
+ assertTrue(info1 != info3);
+ assertTrue(info2 != info3);
+ assertEquals(info1.getResourcePath(), info3.getResourcePath());
+ assertEquals(info2.getResourcePath(), info3.getResourcePath());
+
+ // update last modified when reused dict
+ long info3LastModified = info3.getLastModified();
+ assertTrue(info3LastModified > info2LastModified);
+
// save dictionary works in spite of non-exist table
- Dictionary<String> dict =
DictionaryGenerator.buildDictionary(col.getType(), new
IterableDictionaryValueEnumerator("1", "2", "3"));
+ Dictionary<String> dict =
DictionaryGenerator.buildDictionary(col.getType(),
+ new IterableDictionaryValueEnumerator("1", "2", "3"));
DictionaryInfo info4 = dictMgr.saveDictionary(col,
MockupReadableTable.newNonExistTable("/a/path"), dict);
- assertTrue(info1 == info4);
-
- Dictionary<String> dict2 =
DictionaryGenerator.buildDictionary(col.getType(), new
IterableDictionaryValueEnumerator("1", "2", "3", "4"));
+ assertEquals(info1.getResourcePath(), info4.getResourcePath());
+
+ Dictionary<String> dict2 =
DictionaryGenerator.buildDictionary(col.getType(),
+ new IterableDictionaryValueEnumerator("1", "2", "3", "4"));
DictionaryInfo info5 = dictMgr.saveDictionary(col,
MockupReadableTable.newNonExistTable("/a/path"), dict2);
- assertTrue(info1 != info5);
+ assertNotEquals(info1.getResourcePath(), info5.getResourcePath());
}
}
diff --git
a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java
b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java
new file mode 100644
index 0000000..ab6ce13
--- /dev/null
+++
b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.MockupReadableTable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SnapshotManagerTest extends LocalFileMetadataTestCase {
+
+ private KylinConfig kylinConfig;
+ private SnapshotManager snapshotManager;
+ List<String[]> expect;
+ List<String[]> dif;
+ // test data for scd1
+ List<String[]> contentAtTime1;
+ List<String[]> contentAtTime2;
+ List<String[]> contentAtTime3;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ String[] s1 = new String[] { "1", "CN" };
+ String[] s2 = new String[] { "2", "NA" };
+ String[] s3 = new String[] { "3", "NA" };
+ String[] s4 = new String[] { "4", "KR" };
+ String[] s5 = new String[] { "5", "JP" };
+ String[] s6 = new String[] { "6", "CA" };
+ expect = Lists.newArrayList(s1, s2, s3, s4, s5);
+ dif = Lists.newArrayList(s1, s2, s3, s4, s6);
+
+ contentAtTime1 = Lists.newArrayList(s1, s2, s3, s4, s5, s6);
+ String[] s22 = new String[] { "2", "SP" };
+ contentAtTime2 = Lists.newArrayList(s1, s22, s3, s4, s5);
+ String[] s23 = new String[] { "2", "US" };
+ contentAtTime3 = Lists.newArrayList(s1, s23, s3, s4, s5);
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ private TableDesc genTableDesc(String tableName) {
+ TableDesc table = TableDesc.mockup(tableName);
+ ColumnDesc desc1 = new ColumnDesc("1", "id", "string", null, null,
null, null);
+ desc1.setId("1");
+ desc1.setDatatype("long");
+ ColumnDesc desc2 = new ColumnDesc("2", "country", "string", null,
null, null, null);
+ desc2.setId("2");
+ desc2.setDatatype("string");
+ ColumnDesc[] columns = { desc1, desc2 };
+ table.setColumns(columns);
+ table.init(kylinConfig, "default");
+ return table;
+ }
+
+ private IReadableTable genTable(String path, List<String[]> content) {
+ IReadableTable.TableSignature signature = new
IReadableTable.TableSignature(path, content.size(), 0);
+ return new MockupReadableTable(content, signature, true);
+ }
+
+ @Test
+ public void testCheckByContent() throws IOException, InterruptedException {
+ runTestCase();
+ }
+
+ public void runTestCase() throws IOException, InterruptedException {
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ snapshotManager = SnapshotManager.getInstance(kylinConfig);
+ SnapshotTable origin =
snapshotManager.buildSnapshot(genTable("./origin", expect),
genTableDesc("TEST_TABLE"),
+ kylinConfig);
+
+ // sleep 1 second to avoid file resource store precision lost
+ Thread.sleep(1000);
+ SnapshotTable dup = snapshotManager.buildSnapshot(genTable("./dup",
expect), genTableDesc("TEST_TABLE"),
+ kylinConfig);
+ // assert same snapshot file
+ Assert.assertEquals(origin.getUuid(), dup.getUuid());
+ Assert.assertEquals(origin.getResourcePath(), dup.getResourcePath());
+
+ // assert the file has been updated
+ long originLastModified = origin.getLastModified();
+ long dupLastModified = dup.getLastModified();
+ Assert.assertTrue(dupLastModified > originLastModified);
+
+ SnapshotTable actual =
snapshotManager.getSnapshotTable(origin.getResourcePath());
+ IReadableTable.TableReader reader = actual.getReader();
+ Assert.assertEquals(expect.size(), actual.getRowCount());
+ int i = 0;
+ while (reader.next()) {
+ Assert.assertEquals(stringJoin(expect.get(i++)),
stringJoin(reader.getRow()));
+ }
+
+ SnapshotTable difTable =
snapshotManager.buildSnapshot(genTable("./dif", dif),
genTableDesc("TEST_TABLE"),
+ kylinConfig);
+ Assert.assertNotEquals(origin.getUuid(), difTable.getUuid());
+ }
+
+ @Test
+ public void testBuildSameSnapshotSameTime() throws InterruptedException,
IOException {
+ final int threadCount = 3;
+ final ExecutorService executorService =
Executors.newFixedThreadPool(threadCount);
+ final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
+ final TableDesc tableDesc = genTableDesc("TEST_TABLE");
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ snapshotManager = SnapshotManager.getInstance(kylinConfig);
+ ResourceStore store = ResourceStore.getStore(kylinConfig);
+
+ for (int i = 0; i < threadCount; ++i) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snapshotManager.buildSnapshot(genTable("./origin",
expect), tableDesc, kylinConfig);
+ } catch (IOException e) {
+ Assert.fail();
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+ });
+ }
+ countDownLatch.await();
+ Assert.assertEquals(1,
store.listResources("/table_snapshot/NULL.TEST_TABLE").size());
+ }
+
+ private String stringJoin(String[] strings) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < strings.length; i++) {
+ builder.append(strings[i]);
+ if (i < strings.length - 1) {
+ builder.append(",");
+ }
+ }
+ return builder.toString();
+ }
+}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index aeb7b12..6ab4976 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -91,13 +91,15 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
}
FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+ Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
+ col.getName() +
FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
if (dictFile == null) {
- logger.info("Dict for '" + col.getName() + "' not
pre-built.");
+ logger.info("Dict for '{}' not pre-built.", col.getName());
return null;
}
- try (SequenceFile.Reader reader = new
SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(),
SequenceFile.Reader.file(dictFile))) {
+ try (SequenceFile.Reader reader = new
SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(),
+ SequenceFile.Reader.file(dictFile))) {
NullWritable key = NullWritable.get();
ArrayPrimitiveWritable value = new
ArrayPrimitiveWritable();
reader.next(key, value);
@@ -107,7 +109,7 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
String dictClassName = is.readUTF();
Dictionary<String> dict = (Dictionary<String>)
ClassUtil.newInstance(dictClassName);
dict.readFields(is);
- logger.info("DictionaryProvider read dict from file: "
+ dictFile);
+ logger.info("DictionaryProvider read dict from file:
{}", dictFile);
return dict;
}
}
diff --git
a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
index dd84bd6..e432cdb 100755
--- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java
@@ -19,7 +19,7 @@
package org.apache.kylin.cube;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
import java.io.BufferedWriter;
import java.io.File;
@@ -73,14 +73,17 @@ public class ITDictionaryManagerTest extends
LocalFileMetadataTestCase {
DictionaryInfo info1 = dictMgr.buildDictionary(col,
mockupData.getDistinctValuesFor(col));
System.out.println(JsonUtil.writeValueAsIndentString(info1));
+ Thread.sleep(1000);
+
DictionaryInfo info2 = dictMgr.buildDictionary(col,
mockupData.getDistinctValuesFor(col));
System.out.println(JsonUtil.writeValueAsIndentString(info2));
// test check duplicate
- assertTrue(info1.getUuid() == info2.getUuid());
- assertTrue(info1 ==
dictMgr.getDictionaryInfo(info1.getResourcePath()));
- assertTrue(info2 ==
dictMgr.getDictionaryInfo(info2.getResourcePath()));
- assertTrue(info1.getDictionaryObject() == info2.getDictionaryObject());
+ assertEquals(info1.getUuid(), info2.getUuid());
+ assertEquals(info1.getResourcePath(), info1.getResourcePath());
+ assertNotEquals(info1.getLastModified(), info2.getLastModified());
+ assertNotEquals(info1, info2);
+ assertEquals(info1.getDictionaryObject(), info2.getDictionaryObject());
// verify dictionary entries
@SuppressWarnings("unchecked")
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index e11fe74..360818b 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -20,13 +20,14 @@ package org.apache.kylin.rest.job;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,12 +40,14 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryInfoSerializer;
import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.execution.ExecutableState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class MetadataCleanupJob {
@@ -56,37 +59,39 @@ public class MetadataCleanupJob {
//
============================================================================
final KylinConfig config;
-
- private List<String> garbageResources = Collections.emptyList();
-
+
+ private Map<String, Long> garbageResources = Maps.newHashMap();
+ private ResourceStore store;
+
public MetadataCleanupJob() {
this(KylinConfig.getInstanceFromEnv());
}
-
+
public MetadataCleanupJob(KylinConfig config) {
this.config = config;
+ this.store = ResourceStore.getStore(config);
}
-
- public List<String> getGarbageResources() {
+
+ public Map<String, Long> getGarbageResources() {
return garbageResources;
}
// function entrance
- public List<String> cleanup(boolean delete, int jobOutdatedDays) throws
Exception {
+ public Map<String, Long> cleanup(boolean delete, int jobOutdatedDays)
throws Exception {
CubeManager cubeManager = CubeManager.getInstance(config);
- ResourceStore store = ResourceStore.getStore(config);
long newResourceTimeCut = System.currentTimeMillis() -
NEW_RESOURCE_THREADSHOLD_MS;
FileSystem fs =
HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
- List<String> toDeleteCandidates = Lists.newArrayList();
+ Map<String, Long> toDeleteCandidates = Maps.newHashMap();
// two level resources, snapshot tables and cube statistics
for (String resourceRoot : new String[] {
ResourceStore.SNAPSHOT_RESOURCE_ROOT,
- ResourceStore.CUBE_STATISTICS_ROOT,
ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) {
+ ResourceStore.CUBE_STATISTICS_ROOT,
ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT }) {
for (String dir : noNull(store.listResources(resourceRoot))) {
for (String res : noNull(store.listResources(dir))) {
- if (store.getResourceTimestamp(res) < newResourceTimeCut)
- toDeleteCandidates.add(res);
+ long timestamp = getTimestamp(res);
+ if (timestamp < newResourceTimeCut)
+ toDeleteCandidates.put(res, timestamp);
}
}
}
@@ -94,14 +99,18 @@ public class MetadataCleanupJob {
// find all of the global dictionaries in HDFS
try {
FileStatus[] fStatus = new FileStatus[0];
- fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new
Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() +
"resources/GlobalDict/dict")));
- fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new
Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() +
"resources/SegmentDict/dict")));
+ fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(
+ KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ "resources/GlobalDict/dict")));
+ fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(
+ KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ "resources/SegmentDict/dict")));
for (FileStatus status : fStatus) {
String path = status.getPath().toString();
FileStatus[] globalDicts = fs.listStatus(new Path(path));
for (FileStatus globalDict : globalDicts) {
String globalDictPath = globalDict.getPath().toString();
- toDeleteCandidates.add(globalDictPath);
+ long timestamp = getTimestamp(globalDict);
+ if (timestamp < newResourceTimeCut)
+ toDeleteCandidates.put(globalDictPath, timestamp);
}
}
} catch (FileNotFoundException e) {
@@ -113,8 +122,9 @@ public class MetadataCleanupJob {
for (String dir : noNull(store.listResources(resourceRoot))) {
for (String dir2 : noNull(store.listResources(dir))) {
for (String res : noNull(store.listResources(dir2))) {
- if (store.getResourceTimestamp(res) <
newResourceTimeCut)
- toDeleteCandidates.add(res);
+ long timestamp = getTimestamp(res);
+ if (timestamp < newResourceTimeCut)
+ toDeleteCandidates.put(res, timestamp);
}
}
}
@@ -130,13 +140,16 @@ public class MetadataCleanupJob {
activeResources.add(segment.getStatisticsResourcePath());
for (String dictPath : segment.getDictionaryPaths()) {
DictionaryInfo dictInfo = store.getResource(dictPath,
DictionaryInfoSerializer.FULL_SERIALIZER);
- if
("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ?
dictInfo.getDictionaryClass() : null)){
+ if ("org.apache.kylin.dict.AppendTrieDictionary"
+ .equals(dictInfo != null ?
dictInfo.getDictionaryClass() : null)) {
String dictObj =
dictInfo.getDictionaryObject().toString();
String basedir =
dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1);
- if
(basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ "/resources/GlobalDict")) {
+ if (basedir.startsWith(
+
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() +
"/resources/GlobalDict")) {
activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ "resources/GlobalDict" +
dictInfo.getResourceDir());
- } else if
(basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ "/resources/SegmentDict")) {
+ } else if
(basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ + "/resources/SegmentDict")) {
activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ "resources/SegmentDict" +
dictInfo.getResourceDir());
}
@@ -144,7 +157,7 @@ public class MetadataCleanupJob {
}
}
}
- toDeleteCandidates.removeAll(activeResources);
+ toDeleteCandidates.keySet().removeAll(activeResources);
// delete old and completed jobs
long outdatedJobTimeCut = System.currentTimeMillis() - jobOutdatedDays
* 24 * 3600 * 1000L;
@@ -152,49 +165,77 @@ public class MetadataCleanupJob {
List<ExecutablePO> allExecutable = executableDao.getJobs();
for (ExecutablePO executable : allExecutable) {
long lastModified = executable.getLastModified();
- String jobStatus =
executableDao.getJobOutput(executable.getUuid()).getStatus();
-
- if (lastModified < outdatedJobTimeCut &&
(ExecutableState.SUCCEED.toString().equals(jobStatus)
- ||
ExecutableState.DISCARDED.toString().equals(jobStatus))) {
- toDeleteCandidates.add(ResourceStore.EXECUTE_RESOURCE_ROOT +
"/" + executable.getUuid());
-
toDeleteCandidates.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" +
executable.getUuid());
+ if (lastModified < outdatedJobTimeCut &&
isJobComplete(executableDao, executable)) {
+ String jobResPath = ResourceStore.EXECUTE_RESOURCE_ROOT + "/"
+ executable.getUuid();
+ String jobOutputResPath =
ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid();
+ long outputLastModified = getTimestamp(jobOutputResPath);
+ toDeleteCandidates.put(jobResPath, lastModified);
+ toDeleteCandidates.put(jobOutputResPath, outputLastModified);
- for (ExecutablePO task : executable.getTasks()) {
-
toDeleteCandidates.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" +
task.getUuid());
+ List<ExecutablePO> tasks = executable.getTasks();
+ if (tasks != null && !tasks.isEmpty()) {
+ for (ExecutablePO task : executable.getTasks()) {
+ String taskId = task.getUuid();
+ if (StringUtils.isNotBlank(taskId)) {
+ String resPath =
ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid();
+ long timestamp = getTimestamp(resPath);
+ toDeleteCandidates.put(resPath, timestamp);
+ }
+ }
}
}
}
-
+
garbageResources = cleanupConclude(delete, toDeleteCandidates);
return garbageResources;
}
- private List<String> cleanupConclude(boolean delete, List<String>
toDeleteResources) throws IOException {
+ private boolean isJobComplete(ExecutableDao executableDao, ExecutablePO
job) {
+ String jobId = job.getUuid();
+ boolean isComplete = false;
+ try {
+ ExecutableOutputPO output = executableDao.getJobOutput(jobId);
+ String status = output.getStatus();
+ if (StringUtils.equals(status, ExecutableState.SUCCEED.toString())
+ || StringUtils.equals(status,
ExecutableState.DISCARDED.toString())) {
+ isComplete = true;
+ }
+ } catch (PersistentException e) {
+ logger.error("Get job output failed for job uuid: {}", jobId, e);
+ isComplete = true; // job output broken --> will be treat as
complete
+ }
+
+ return isComplete;
+ }
+
+ private Map<String, Long> cleanupConclude(boolean delete, Map<String,
Long> toDeleteResources) throws IOException {
if (toDeleteResources.isEmpty()) {
logger.info("No metadata resource to clean up");
return toDeleteResources;
}
-
- logger.info(toDeleteResources.size() + " metadata resource to clean
up");
+
+ logger.info("{} metadata resource to clean up",
toDeleteResources.size());
if (delete) {
ResourceStore store = ResourceStore.getStore(config);
FileSystem fs =
HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
- for (String res : toDeleteResources) {
- logger.info("Deleting metadata " + res);
+ for (String res : toDeleteResources.keySet()) {
+ long timestamp = toDeleteResources.get(res);
+ logger.info("Deleting metadata=[resource_path: {}, timestamp:
{}]", res, timestamp);
try {
if
(res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) {
fs.delete(new Path(res), true);
} else {
- store.deleteResource(res);
+ store.deleteResource(res, timestamp);
}
} catch (IOException e) {
- logger.error("Failed to delete resource " + res, e);
+ logger.error("Failed to delete metadata=[resource_path:
{}, timestamp: {}] ", res, timestamp, e);
}
}
} else {
- for (String res : toDeleteResources) {
- logger.info("Dry run, pending delete metadata " + res);
+ for (String res : toDeleteResources.keySet()) {
+ long timestamp = toDeleteResources.get(res);
+ logger.info("Dry run, pending delete metadata=[resource_path:
{}, timestamp: {}] ", res, timestamp);
}
}
return toDeleteResources;
@@ -204,4 +245,17 @@ public class MetadataCleanupJob {
return (list == null) ? new TreeSet<String>() : list;
}
+ private long getTimestamp(String resPath) {
+ long timestamp = Long.MAX_VALUE;
+ try {
+ timestamp = store.getResourceTimestamp(resPath);
+ } catch (IOException e) {
+ logger.warn("Failed to get resource timestamp from remote resource
store, details:{}", e);
+ }
+ return timestamp;
+ }
+
+ private long getTimestamp(FileStatus filestatus) {
+ return filestatus.getModificationTime();
+ }
}
diff --git
a/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
b/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
index fa85a1e..ad819e6 100644
---
a/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
+++
b/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java
@@ -25,7 +25,7 @@ import static
org.apache.kylin.common.util.LocalFileMetadataTestCase.staticCreat
import java.io.File;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
@@ -42,18 +42,22 @@ public class MetadataCleanupJobTest {
@Test
public void testCleanUp() throws Exception {
- staticCreateTestMetadata(false, new ResetTimeHook(1,
"src/test/resources/test_meta"));
+ // file resource store may lose timestamp precision with millis
second, set last modified as 2000
+ staticCreateTestMetadata(false, new ResetTimeHook(2000,
"src/test/resources/test_meta"));
MetadataCleanupJob metadataCleanupJob = new MetadataCleanupJob();
- List<String> cleanupList = metadataCleanupJob.cleanup(false, 30);
- Assert.assertEquals(7, cleanupList.size());
+ Map<String, Long> cleanupMap = metadataCleanupJob.cleanup(false, 30);
+ Assert.assertEquals(7, cleanupMap.size());
+ for (long timestamp : cleanupMap.values()) {
+ Assert.assertEquals(2000, timestamp);
+ }
}
@Test
public void testNotCleanUp() throws Exception {
staticCreateTestMetadata(false, new
ResetTimeHook(System.currentTimeMillis(), "src/test/resources/test_meta"));
MetadataCleanupJob metadataCleanupJob = new MetadataCleanupJob();
- List<String> cleanupList = metadataCleanupJob.cleanup(false, 30);
- Assert.assertEquals(0, cleanupList.size());
+ Map<String, Long> cleanupMap = metadataCleanupJob.cleanup(false, 30);
+ Assert.assertEquals(0, cleanupMap.size());
}
private class ResetTimeHook extends
LocalFileMetadataTestCase.OverlayMetaHook {
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 14c5ea7..ecd698f 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -147,7 +147,7 @@ public class HBaseResourceStore extends
PushdownResourceStore {
@Override
protected void visitFolderImpl(String folderPath, final boolean recursive,
VisitFilter filter,
- final boolean loadContent, final Visitor
visitor) throws IOException {
+ final boolean loadContent, final Visitor visitor) throws
IOException {
visitFolder(folderPath, filter, loadContent, new FolderVisitor() {
@Override
@@ -167,7 +167,8 @@ public class HBaseResourceStore extends
PushdownResourceStore {
});
}
- private void visitFolder(String folderPath, VisitFilter filter, boolean
loadContent, FolderVisitor visitor) throws IOException {
+ private void visitFolder(String folderPath, VisitFilter filter, boolean
loadContent, FolderVisitor visitor)
+ throws IOException {
assert folderPath.startsWith("/");
String folderPrefix = folderPath.endsWith("/") ? folderPath :
folderPath + "/";
@@ -244,7 +245,7 @@ public class HBaseResourceStore extends
PushdownResourceStore {
CompareFilter.CompareOp.LESS,
Bytes.toBytes(visitFilter.lastModEndExclusive));
filterList.addFilter(timeEndFilter);
}
- return filterList.getFilters().size() == 0 ? null : filterList;
+ return filterList.getFilters().isEmpty() ? null : filterList;
}
private InputStream getInputStream(String resPath, Result r) throws
IOException {
@@ -352,18 +353,40 @@ public class HBaseResourceStore extends
PushdownResourceStore {
}
@Override
- protected void deleteResourceImpl(String resPath) throws IOException {
+ protected void updateTimestampImpl(String resPath, long timestamp) throws
IOException {
Table table = getConnection().getTable(TableName.valueOf(tableName));
try {
- boolean hdfsResourceExist = false;
- Result result = internalGetFromHTable(table, resPath, true, false);
- if (result != null) {
- byte[] value = result.getValue(B_FAMILY, B_COLUMN);
- if (value != null && value.length == 0) {
- hdfsResourceExist = true;
- }
+ boolean hdfsResourceExist = isHdfsResourceExist(table, resPath);
+ long oldTS = getResourceLastModified(table, resPath);
+
+ byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
+ byte[] row = Bytes.toBytes(resPath);
+ Put put = new Put(row);
+ put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(timestamp));
+
+ boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS,
put);
+ logger.trace("Update row {} from oldTs: {}, to newTs: {},
operation result: {}", resPath, oldTS, timestamp,
+ ok);
+ if (!ok) {
+ long real = getResourceTimestampImpl(resPath);
+ throw new WriteConflictException(
+ "Overwriting conflict " + resPath + ", expect old TS "
+ oldTS + ", but it is " + real);
}
+ if (hdfsResourceExist) { // update timestamp in hdfs
+ updateTimestampPushdown(resPath, timestamp);
+ }
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ @Override
+ protected void deleteResourceImpl(String resPath) throws IOException {
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
+ try {
+ boolean hdfsResourceExist = isHdfsResourceExist(table, resPath);
+
Delete del = new Delete(Bytes.toBytes(resPath));
table.delete(del);
@@ -376,6 +399,43 @@ public class HBaseResourceStore extends
PushdownResourceStore {
}
@Override
+ protected void deleteResourceImpl(String resPath, long timestamp) throws
IOException {
+ Table table = getConnection().getTable(TableName.valueOf(tableName));
+ try {
+ boolean hdfsResourceExist = isHdfsResourceExist(table, resPath);
+ long origLastModified = getResourceLastModified(table, resPath);
+ if (checkTimeStampBeforeDelete(origLastModified, timestamp)) {
+ Delete del = new Delete(Bytes.toBytes(resPath));
+ table.delete(del);
+
+ if (hdfsResourceExist) { // remove hdfs cell value
+ deletePushdown(resPath);
+ }
+ }
+ } finally {
+ IOUtils.closeQuietly(table);
+ }
+ }
+
+ // to avoid get Table twice time to improve delete performance
+ private long getResourceLastModified(Table table, String resPath) throws
IOException {
+ return getTimestamp(internalGetFromHTable(table, resPath, false,
true));
+ }
+
+ private boolean isHdfsResourceExist(Table table, String resPath) throws
IOException {
+ boolean hdfsResourceExist = false;
+ Result result = internalGetFromHTable(table, resPath, true, false);
+ if (result != null) {
+ byte[] contentVal = result.getValue(B_FAMILY, B_COLUMN);
+ if (contentVal != null && contentVal.length == 0) {
+ hdfsResourceExist = true;
+ }
+ }
+
+ return hdfsResourceExist;
+ }
+
+ @Override
protected String getReadableResourcePathImpl(String resPath) {
return tableName + "(key='" + resPath + "')@" +
kylinConfig.getMetadataUrl();
}