This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 831f9a6892 KYLIN-5963 Building snapshot with new storage type
831f9a6892 is described below
commit 831f9a6892778d27f1c15ba7be1995bb8a8f5212
Author: Zhimin Wu <[email protected]>
AuthorDate: Thu Aug 29 18:53:17 2024 +0800
KYLIN-5963 Building snapshot with new storage type
1. Building snapshot with new storage type
2. Fix storage cleaner
3. Fix streaming job uuid length
---
pom.xml | 5 -----
.../src/main/resources/metadata-jdbc-h2.properties | 2 +-
.../src/main/resources/metadata-jdbc-mysql.properties | 2 +-
.../src/main/resources/metadata-jdbc-postgresql.properties | 2 +-
.../kylin/metadata/cube/model/NDataLayoutDetails.java | 4 ++++
.../engine/spark/job/stage/build/RefreshSnapshots.scala | 4 ++--
.../spark/job/stage/build/RefreshSnapshotsTest.scala | 1 +
.../java/org/apache/kylin/tool/garbage/StorageCleaner.java | 14 +++++++-------
.../java/org/apache/kylin/tool/StorageCleanerTest.java | 12 ++++++++++++
.../1/keep | 0
.../20001}/keep | 0
.../30001}/keep | 0
.../invalid/keep | 0
.../delta/82fa7671-a935-45f5-8779-85703601f49a/1/keep | 0
.../storage_v3_test/delta/invalid/1/keep | 0
15 files changed, 29 insertions(+), 17 deletions(-)
diff --git a/pom.xml b/pom.xml
index aef270dfa3..6ee503770a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2229,11 +2229,6 @@
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-core</artifactId>
- <version>${iceberg.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
index 2a458bdbb7..a782d17047 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
@@ -508,7 +508,7 @@ CREATE TABLE IF NOT EXISTS %s_streaming_job\
id bigint AUTO_INCREMENT NOT NULL,\
meta_key varchar(255) NOT NULL,\
project varchar(255) NOT NULL,\
-uuid CHAR(36) NOT NULL,\
+uuid CHAR(64) NOT NULL,\
mvcc bigint NOT NULL,\
ts bigint NOT NULL,\
content bytea NOT NULL,\
diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
index be435c91c4..f61640a013 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -565,7 +565,7 @@ CREATE TABLE IF NOT EXISTS `%s_streaming_job` \
`id` bigint unsigned NOT NULL AUTO_INCREMENT, \
`meta_key` varchar(255) NOT NULL, \
`project` varchar(255) NOT NULL, \
-`uuid` CHAR(36) NOT NULL COLLATE utf8_bin, \
+`uuid` CHAR(64) NOT NULL COLLATE utf8_bin, \
`mvcc` bigint NOT NULL, \
`ts` bigint NOT NULL, \
`content` longblob NOT NULL, \
diff --git
a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
index 59dc3bcb12..c81fd29676 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
@@ -508,7 +508,7 @@ CREATE TABLE IF NOT EXISTS %s_streaming_job\
id bigserial NOT NULL,\
meta_key varchar(255) NOT NULL,\
project varchar(255) NOT NULL,\
-uuid CHAR(36) COLLATE "C" NOT NULL,\
+uuid CHAR(64) COLLATE "C" NOT NULL,\
mvcc bigint NOT NULL,\
ts bigint NOT NULL,\
content bytea NOT NULL,\
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
index 021b291b2b..7f3b3a3f28 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
@@ -186,4 +186,8 @@ public class NDataLayoutDetails extends
RootPersistentEntity implements Serializ
return this.minCompactionFileSizeInBytes;
}
}
+
+ public String getRelativeStoragePath() {
+ return project + "/delta/" + getModelId() + "/" + getLayoutId();
+ }
}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
index 30d9d67f2a..fbfc0fe814 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
@@ -38,8 +38,8 @@ class RefreshSnapshots(jobContext: SegmentJob) extends
StageExec {
}
override def execute(): Unit = {
- (jobContext.getDataflow(jobContext.getDataflowId).getModel.getStorageType,
jobContext) match {
- case (storageType, job: SegmentBuildJob) if storageType.isV1Storage =>
+ jobContext match {
+ case job: SegmentBuildJob =>
job.tryRefreshSnapshots(this)
case _ =>
}
diff --git
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
index 82931d40ca..db8ade6d58 100644
---
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
+++
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
@@ -29,6 +29,7 @@ class RefreshSnapshotsTest extends AnyFunSuite {
val segmentJob = Mockito.mock(classOf[SegmentJob])
val refreshSnapshots = new RefreshSnapshots(segmentJob)
+ refreshSnapshots.execute()
Assert.assertEquals("RefreshSnapshots", refreshSnapshots.getStageName)
}
}
diff --git
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
index 1f01b2475f..2f46f6580c 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
@@ -169,7 +169,7 @@ public class StorageCleaner implements GarbageCleaner {
allFileSystems.add(new
StorageItem(FileSystemDecorator.getInstance(HadoopUtil.getWorkingFileSystem()),
config.getHdfsWorkingDirectory()));
// Check if independent storage of flat tables under read/write
separation is enabled
- // For build tasks it is a project-level parameter(Higher
project-level priority),
+ // For build tasks it is a project-level parameter(Higher
project-level priority),
// but for cleaning up storage garbage,
// WRITING_CLUSTER_WORKING_DIR is a system-level parameter
if (kylinConfig.isBuildFilesSeparationEnabled()) {
@@ -699,10 +699,10 @@ public class StorageCleaner implements GarbageCleaner {
val dataflows = NDataflowManager.getInstance(config,
project).listAllDataflows().stream()
.map(RootPersistentEntity::getId).collect(Collectors.toSet());
val deltaDataFlow = NDataflowManager.getInstance(config,
project).listAllDataflows().stream()
- .filter(df ->
df.getModel().getStorageType().isDeltaStorage()).map(RootPersistentEntity::getId)
- .collect(Collectors.toSet());
- val dataLayoutDetails = NDataflowManager.getInstance(config,
project).listAllDataflows().stream()
- .flatMap(df ->
df.listAllLayoutDetails().stream().map(NDataLayoutDetails::getResourcePath))
+ .filter(df ->
df.getModel().getStorageType().isDeltaStorage())
+
.map(RootPersistentEntity::getId).collect(Collectors.toSet());
+ val activeDeltaLayoutData = NDataflowManager.getInstance(config,
project).listAllDataflows().stream()
+ .flatMap(df ->
df.listAllLayoutDetails().stream().map(NDataLayoutDetails::getRelativeStoragePath))
.collect(Collectors.toSet());
// set activeSegmentFlatTableDataPath, by iterating segments
dataflowManager.listAllDataflows().forEach(df ->
df.getSegments().stream() //
@@ -723,8 +723,8 @@ public class StorageCleaner implements GarbageCleaner {
for (StorageCleaner.StorageItem item : allFileSystems) {
item.getProject(project).getDataflows().removeIf(node ->
dataflows.contains(node.getName()));
item.getProject(project).getDeltaDataFlows().removeIf(node ->
deltaDataFlow.contains(node.getName()));
- item.getProject(project).getDeltaDataLayouts()
- .removeIf(node ->
dataLayoutDetails.contains(node.getRelativePath()));
+ item.getProject(project).getDeltaDataLayouts().removeIf(node ->
+
activeDeltaLayoutData.contains(node.getRelativePath()));
item.getProject(project).getSegments()
.removeIf(node ->
activeSegmentPath.contains(node.getRelativePath()));
item.getProject(project).getLayouts()
diff --git
a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
index 16aa1127d5..dd4a3483f9 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
@@ -113,7 +113,19 @@ public class StorageCleanerTest extends
NLocalFileMetadataTestCase {
val config = getTestConfig();
FileUtils.copyDirectory(new
File("src/test/resources/ut_storage/delta_storage_cleaner_test"),
new File(config.getHdfsWorkingDirectory().replace("file://",
"")));
+ val baseDir = new
File(getTestConfig().getMetadataUrl().getIdentifier()).getParentFile();
+ val files = FileUtils.listFiles(new File(baseDir, "working-dir"),
null, true);
+ val garbageFiles = files.stream().filter(f ->
f.getAbsolutePath().contains("invalid")
+ && f.getAbsolutePath().contains("storage_v3_test"))
+ .map(f ->
FilenameUtils.normalize(f.getParentFile().getAbsolutePath())).collect(Collectors.toSet());
cleaner.execute();
+ val outDateItem = cleaner.getOutdatedItems().stream()
+ .filter(out -> out.getPath().contains("storage_v3_test"))
+ .collect(Collectors.toSet());
+ Assert.assertEquals(garbageFiles.size(), outDateItem.size());
+ for (String outdatedItem : normalizeGarbages(outDateItem)) {
+ Assert.assertTrue(outdatedItem + " not in garbageFiles",
garbageFiles.contains(outdatedItem));
+ }
}
@Test
diff --git
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/4e69fc98-d12e-443e-81ff-a8b45a73e48a/1/keep
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/1/keep
similarity index 100%
rename from
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/4e69fc98-d12e-443e-81ff-a8b45a73e48a/1/keep
rename to
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/1/keep
diff --git
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10002/keep
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/20001/keep
similarity index 100%
rename from
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10002/keep
rename to
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/20001/keep
diff --git
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10003/keep
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/30001/keep
similarity index 100%
rename from
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10003/keep
rename to
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/30001/keep
diff --git
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/invalid/keep
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/invalid/keep
similarity index 100%
rename from
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/invalid/keep
rename to
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/invalid/keep
diff --git
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/82fa7671-a935-45f5-8779-85703601f49a/1/keep
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/82fa7671-a935-45f5-8779-85703601f49a/1/keep
deleted file mode 100644
index e69de29bb2..0000000000
diff --git
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/invalid/1/keep
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/invalid/1/keep
deleted file mode 100644
index e69de29bb2..0000000000