This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e62f6ea42fc420045597b4d9d35b06a9a1606f52
Author: luoping.zhang <luoping.zh...@kyligence.io>
AuthorDate: Thu Apr 13 14:16:10 2023 +0800

    KYLIN-5636 automatically clean up dependent files after the build task
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 ++++++
 .../engine/spark/application/SparkApplication.java | 11 ++++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   | 24 +++++++++--------
 .../spark/application/SparkApplicationTest.java    | 31 +++++++++++++++++-----
 .../engine/spark/job/NSparkCubingJobTest.java      |  5 ++--
 5 files changed, 59 insertions(+), 20 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b2f8f3d843..753d4e00bd 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3962,4 +3962,12 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
TimeUtil.timeStringAs(getOptional("kylin.multi-tenant.route-task-timeout", 
"30min"),
                 TimeUnit.MILLISECONDS);
     }
+
+    public String getKubernetesUploadPath() {
+        return getOptional(getKubernetesUploadPathKey());
+    }
+
+    public String getKubernetesUploadPathKey() {
+        return "kylin.engine.spark-conf.spark.kubernetes.file.upload.path";
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index bc0ba7b6d3..bae641e0b0 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -71,8 +71,11 @@ import 
org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
 import org.apache.kylin.engine.spark.job.SparkJobConstants;
 import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
+import org.apache.kylin.engine.spark.utils.HDFSUtils;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -417,6 +420,14 @@ public abstract class SparkApplication implements 
Application {
     }
 
     public void extraDestroy() {
+        if (config != null && 
StringUtils.isNotEmpty(config.getKubernetesUploadPath())) {
+            logger.info("uploadPath={}", config.getKubernetesUploadPath());
+            try {
+                HDFSUtils.deleteMarkFile(config.getKubernetesUploadPath());
+            } catch (Exception e) {
+                logger.warn("Failed to delete " + 
config.getKubernetesUploadPath(), e);
+            }
+        }
         if (clusterMonitor != null) {
             clusterMonitor.shutdown();
         }
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 9d30029819..f73da6d1bc 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -55,6 +55,11 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.StringHelper;
 import org.apache.kylin.engine.spark.merger.MetadataMerger;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.JobStoppedException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -70,18 +75,12 @@ import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
 import org.apache.kylin.metadata.view.LogicalView;
 import org.apache.kylin.metadata.view.LogicalViewManager;
+import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import lombok.val;
 
 /**
@@ -372,6 +371,11 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
                 jobOverrides.put("kylin.engine.spark-conf." + 
SPARK_YARN_QUEUE, yarnQueue);
             }
         }
+        String path = kylinConfigExt.getKubernetesUploadPath();
+        if (StringUtils.isNotEmpty(path)) {
+            jobOverrides.put(kylinConfigExt.getKubernetesUploadPathKey(),
+                    path + "/" + StringUtils.defaultIfBlank(parentId, 
getId()));
+        }
         return KylinConfigExt.createInstance(kylinConfigExt, jobOverrides);
     }
 
@@ -481,10 +485,8 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         String dataflowId = getDataflowId();
         LogicalViewManager viewManager = 
LogicalViewManager.getInstance(config);
         if (StringUtils.isNotBlank(dataflowId)) {
-            Set<String> viewsMeta = viewManager
-                .findLogicalViewsInModel(getProject(), getDataflowId())
-                .stream().map(LogicalView::getResourcePath)
-                .collect(Collectors.toSet());
+            Set<String> viewsMeta = 
viewManager.findLogicalViewsInModel(getProject(), getDataflowId()).stream()
+                    
.map(LogicalView::getResourcePath).collect(Collectors.toSet());
             dumpList.addAll(viewsMeta);
         }
         if (StringUtils.isNotBlank(table)) {
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index 6ac8ae088e..cc44aedd0d 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -27,10 +27,14 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.engine.spark.job.ParamsConstants;
 import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
@@ -45,11 +49,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
-import org.apache.kylin.engine.spark.job.ParamsConstants;
+import org.springframework.test.util.ReflectionTestUtils;
 
 public class SparkApplicationTest extends NLocalWithSparkSessionTest {
 
@@ -131,8 +131,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTest {
         Mockito.reset(report);
         
Mockito.doReturn("http://sandbox.hortonworks.com:8088/proxy/application_1561370224051_0160/";).when(application)
                 .getTrackingUrl(null, ss);
-        Mockito.doReturn(Boolean.FALSE).when(report).updateSparkJobInfo(params,
-                "/kylin/api/jobs/spark", payloadJson);
+        
Mockito.doReturn(Boolean.FALSE).when(report).updateSparkJobInfo(params, 
"/kylin/api/jobs/spark", payloadJson);
         Assert.assertFalse(report.updateSparkJobExtraInfo(params, 
"/kylin/api/jobs/spark", "test_job_output",
                 "cb91189b-2b12-4527-aa35-0130e7d54ec0", extraInfo));
 
@@ -187,4 +186,22 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTest {
         
Assert.assertTrue(sparkApplication.checkRangePartitionTableIsExist(nDataModel2));
     }
 
+    @Test
+    public void testExtraDestroy() throws IOException {
+        KylinConfig config = getTestConfig();
+        String path = tempDir.getPath() + "/upload";
+        SparkApplication application = new SparkApplication() {
+            @Override
+            protected void doExecute() {
+            }
+        };
+        File upload = new File(path);
+        FileUtils.forceMkdir(upload);
+        Assert.assertTrue(upload.exists());
+        config.setProperty(config.getKubernetesUploadPathKey(), path);
+        ReflectionTestUtils.setField(application, "config", config);
+        application.extraDestroy();
+        Assert.assertFalse(upload.exists());
+    }
+
 }
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
index a7a221a530..b86eb333f3 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
@@ -50,6 +50,7 @@ import 
org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
 import org.apache.kylin.engine.spark.merger.AfterBuildResourceMerger;
 import org.apache.kylin.engine.spark.storage.ParquetStorage;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.job.dao.JobStatistics;
 import org.apache.kylin.job.dao.JobStatisticsManager;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -97,8 +98,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.sparkproject.guava.collect.Sets;
 
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-
 import lombok.val;
 import scala.Option;
 import scala.runtime.AbstractFunction1;
@@ -624,9 +623,11 @@ public class NSparkCubingJobTest extends 
NLocalWithSparkSessionTest {
         NProjectManager.getInstance(getTestConfig()).updateProject(project, 
copyForWrite -> {
             LinkedHashMap<String, String> overrideKylinProps = 
copyForWrite.getOverrideKylinProps();
             
overrideKylinProps.put("kylin.engine.spark-conf.spark.locality.wait", "10");
+            
overrideKylinProps.put("kylin.engine.spark-conf.spark.kubernetes.file.upload.path",
 "/tmp");
         });
         // get SparkConfigOverride from project overrideProps
         KylinConfig config = executable.getConfig();
+        Assert.assertEquals("/tmp/" + executable.getId(), 
config.getKubernetesUploadPath());
         Assert.assertEquals(getTestConfig(), config.base());
         
Assert.assertNull(getTestConfig().getSparkConfigOverride().get("spark.locality.wait"));
         Assert.assertEquals("10", 
config.getSparkConfigOverride().get("spark.locality.wait"));

Reply via email to