This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 831f9a6892 KYLIN-5963 Building snapshot with new storage type
831f9a6892 is described below

commit 831f9a6892778d27f1c15ba7be1995bb8a8f5212
Author: Zhimin Wu <[email protected]>
AuthorDate: Thu Aug 29 18:53:17 2024 +0800

    KYLIN-5963 Building snapshot with new storage type
    
    1. Building snapshot with new storage type
    2. Fix storage cleaner
    3. Fix streaming job uuid length
---
 pom.xml                                                    |  5 -----
 .../src/main/resources/metadata-jdbc-h2.properties         |  2 +-
 .../src/main/resources/metadata-jdbc-mysql.properties      |  2 +-
 .../src/main/resources/metadata-jdbc-postgresql.properties |  2 +-
 .../kylin/metadata/cube/model/NDataLayoutDetails.java      |  4 ++++
 .../engine/spark/job/stage/build/RefreshSnapshots.scala    |  4 ++--
 .../spark/job/stage/build/RefreshSnapshotsTest.scala       |  1 +
 .../java/org/apache/kylin/tool/garbage/StorageCleaner.java | 14 +++++++-------
 .../java/org/apache/kylin/tool/StorageCleanerTest.java     | 12 ++++++++++++
 .../1/keep                                                 |  0
 .../20001}/keep                                            |  0
 .../30001}/keep                                            |  0
 .../invalid/keep                                           |  0
 .../delta/82fa7671-a935-45f5-8779-85703601f49a/1/keep      |  0
 .../storage_v3_test/delta/invalid/1/keep                   |  0
 15 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/pom.xml b/pom.xml
index aef270dfa3..6ee503770a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2229,11 +2229,6 @@
                 <artifactId>iceberg-core</artifactId>
                 <version>${iceberg.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.iceberg</groupId>
-                <artifactId>iceberg-core</artifactId>
-                <version>${iceberg.version}</version>
-            </dependency>
             <dependency>
                 <groupId>org.apache.iceberg</groupId>
                 <artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties 
b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
index 2a458bdbb7..a782d17047 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
@@ -508,7 +508,7 @@ CREATE TABLE IF NOT EXISTS %s_streaming_job\
 id               bigint AUTO_INCREMENT NOT NULL,\
 meta_key         varchar(255)          NOT NULL,\
 project          varchar(255)          NOT NULL,\
-uuid             CHAR(36)              NOT NULL,\
+uuid             CHAR(64)              NOT NULL,\
 mvcc             bigint                NOT NULL,\
 ts               bigint                NOT NULL,\
 content          bytea                 NOT NULL,\
diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties 
b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
index be435c91c4..f61640a013 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -565,7 +565,7 @@ CREATE TABLE IF NOT EXISTS `%s_streaming_job` \
 `id`               bigint unsigned NOT NULL AUTO_INCREMENT, \
 `meta_key`         varchar(255)    NOT NULL, \
 `project`          varchar(255)    NOT NULL, \
-`uuid`             CHAR(36)        NOT NULL COLLATE utf8_bin, \
+`uuid`             CHAR(64)        NOT NULL COLLATE utf8_bin, \
 `mvcc`             bigint          NOT NULL, \
 `ts`               bigint          NOT NULL, \
 `content`          longblob        NOT NULL, \
diff --git 
a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties 
b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
index 59dc3bcb12..c81fd29676 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
@@ -508,7 +508,7 @@ CREATE TABLE IF NOT EXISTS %s_streaming_job\
 id               bigserial            NOT NULL,\
 meta_key         varchar(255)         NOT NULL,\
 project          varchar(255)         NOT NULL,\
-uuid             CHAR(36) COLLATE "C" NOT NULL,\
+uuid             CHAR(64) COLLATE "C" NOT NULL,\
 mvcc             bigint               NOT NULL,\
 ts               bigint               NOT NULL,\
 content          bytea                NOT NULL,\
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
index 021b291b2b..7f3b3a3f28 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayoutDetails.java
@@ -186,4 +186,8 @@ public class NDataLayoutDetails extends 
RootPersistentEntity implements Serializ
             return this.minCompactionFileSizeInBytes;
         }
     }
+
+    public String getRelativeStoragePath() {
+        return project + "/delta/" + getModelId() + "/" + getLayoutId();
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
index 30d9d67f2a..fbfc0fe814 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala
@@ -38,8 +38,8 @@ class RefreshSnapshots(jobContext: SegmentJob) extends 
StageExec {
   }
 
   override def execute(): Unit = {
-    (jobContext.getDataflow(jobContext.getDataflowId).getModel.getStorageType, 
jobContext) match {
-      case (storageType, job: SegmentBuildJob) if storageType.isV1Storage =>
+    jobContext match {
+      case job: SegmentBuildJob =>
         job.tryRefreshSnapshots(this)
       case _ =>
     }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
index 82931d40ca..db8ade6d58 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala
@@ -29,6 +29,7 @@ class RefreshSnapshotsTest extends AnyFunSuite {
     val segmentJob = Mockito.mock(classOf[SegmentJob])
 
     val refreshSnapshots = new RefreshSnapshots(segmentJob)
+    refreshSnapshots.execute()
     Assert.assertEquals("RefreshSnapshots", refreshSnapshots.getStageName)
   }
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
index 1f01b2475f..2f46f6580c 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
@@ -169,7 +169,7 @@ public class StorageCleaner implements GarbageCleaner {
         allFileSystems.add(new 
StorageItem(FileSystemDecorator.getInstance(HadoopUtil.getWorkingFileSystem()),
                 config.getHdfsWorkingDirectory()));
         // Check if independent storage of flat tables under read/write 
separation is enabled
-        // For build tasks it is a project-level parameter(Higher 
project-level priority), 
+        // For build tasks it is a project-level parameter(Higher 
project-level priority),
         // but for cleaning up storage garbage,
         // WRITING_CLUSTER_WORKING_DIR is a system-level parameter
         if (kylinConfig.isBuildFilesSeparationEnabled()) {
@@ -699,10 +699,10 @@ public class StorageCleaner implements GarbageCleaner {
             val dataflows = NDataflowManager.getInstance(config, 
project).listAllDataflows().stream()
                     
.map(RootPersistentEntity::getId).collect(Collectors.toSet());
             val deltaDataFlow = NDataflowManager.getInstance(config, 
project).listAllDataflows().stream()
-                    .filter(df -> 
df.getModel().getStorageType().isDeltaStorage()).map(RootPersistentEntity::getId)
-                    .collect(Collectors.toSet());
-            val dataLayoutDetails = NDataflowManager.getInstance(config, 
project).listAllDataflows().stream()
-                    .flatMap(df -> 
df.listAllLayoutDetails().stream().map(NDataLayoutDetails::getResourcePath))
+                    .filter(df -> 
df.getModel().getStorageType().isDeltaStorage())
+                    
.map(RootPersistentEntity::getId).collect(Collectors.toSet());
+            val activeDeltaLayoutData = NDataflowManager.getInstance(config, 
project).listAllDataflows().stream()
+                    .flatMap(df -> 
df.listAllLayoutDetails().stream().map(NDataLayoutDetails::getRelativeStoragePath))
                     .collect(Collectors.toSet());
             // set activeSegmentFlatTableDataPath, by iterating segments
             dataflowManager.listAllDataflows().forEach(df -> 
df.getSegments().stream() //
@@ -723,8 +723,8 @@ public class StorageCleaner implements GarbageCleaner {
             for (StorageCleaner.StorageItem item : allFileSystems) {
                 item.getProject(project).getDataflows().removeIf(node -> 
dataflows.contains(node.getName()));
                 item.getProject(project).getDeltaDataFlows().removeIf(node -> 
deltaDataFlow.contains(node.getName()));
-                item.getProject(project).getDeltaDataLayouts()
-                        .removeIf(node -> 
dataLayoutDetails.contains(node.getRelativePath()));
+                item.getProject(project).getDeltaDataLayouts().removeIf(node ->
+                        
activeDeltaLayoutData.contains(node.getRelativePath()));
                 item.getProject(project).getSegments()
                         .removeIf(node -> 
activeSegmentPath.contains(node.getRelativePath()));
                 item.getProject(project).getLayouts()
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
index 16aa1127d5..dd4a3483f9 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
@@ -113,7 +113,19 @@ public class StorageCleanerTest extends 
NLocalFileMetadataTestCase {
         val config = getTestConfig();
         FileUtils.copyDirectory(new 
File("src/test/resources/ut_storage/delta_storage_cleaner_test"),
                 new File(config.getHdfsWorkingDirectory().replace("file://", 
"")));
+        val baseDir = new 
File(getTestConfig().getMetadataUrl().getIdentifier()).getParentFile();
+        val files = FileUtils.listFiles(new File(baseDir, "working-dir"), 
null, true);
+        val garbageFiles = files.stream().filter(f -> 
f.getAbsolutePath().contains("invalid")
+                        && f.getAbsolutePath().contains("storage_v3_test"))
+                .map(f -> 
FilenameUtils.normalize(f.getParentFile().getAbsolutePath())).collect(Collectors.toSet());
         cleaner.execute();
+        val outDateItem = cleaner.getOutdatedItems().stream()
+                .filter(out -> out.getPath().contains("storage_v3_test"))
+                .collect(Collectors.toSet());
+        Assert.assertEquals(garbageFiles.size(), outDateItem.size());
+        for (String outdatedItem : normalizeGarbages(outDateItem)) {
+            Assert.assertTrue(outdatedItem + " not in garbageFiles", 
garbageFiles.contains(outdatedItem));
+        }
     }
 
     @Test
diff --git 
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/4e69fc98-d12e-443e-81ff-a8b45a73e48a/1/keep
 
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/1/keep
similarity index 100%
rename from 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/4e69fc98-d12e-443e-81ff-a8b45a73e48a/1/keep
rename to 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/1/keep
diff --git 
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10002/keep
 
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/20001/keep
similarity index 100%
rename from 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10002/keep
rename to 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/20001/keep
diff --git 
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10003/keep
 
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/30001/keep
similarity index 100%
rename from 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/10003/keep
rename to 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/30001/keep
diff --git 
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/invalid/keep
 
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/invalid/keep
similarity index 100%
rename from 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/53bb6ab4-8058-4696-bc06-597a5e9a9103/invalid/keep
rename to 
src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/7d840904-7b34-4edd-aabd-79df992ef32e/invalid/keep
diff --git 
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/82fa7671-a935-45f5-8779-85703601f49a/1/keep
 
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/82fa7671-a935-45f5-8779-85703601f49a/1/keep
deleted file mode 100644
index e69de29bb2..0000000000
diff --git 
a/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/invalid/1/keep
 
b/src/tool/src/test/resources/ut_storage/delta_storage_cleaner_test/storage_v3_test/delta/invalid/1/keep
deleted file mode 100644
index e69de29bb2..0000000000

Reply via email to