This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e62f6ea42fc420045597b4d9d35b06a9a1606f52 Author: luoping.zhang <luoping.zh...@kyligence.io> AuthorDate: Thu Apr 13 14:16:10 2023 +0800 KYLIN-5636 automatically clean up dependent files after the build task --- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++++++ .../engine/spark/application/SparkApplication.java | 11 ++++++++ .../kylin/engine/spark/job/NSparkExecutable.java | 24 +++++++++-------- .../spark/application/SparkApplicationTest.java | 31 +++++++++++++++++----- .../engine/spark/job/NSparkCubingJobTest.java | 5 ++-- 5 files changed, 59 insertions(+), 20 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b2f8f3d843..753d4e00bd 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3962,4 +3962,12 @@ public abstract class KylinConfigBase implements Serializable { return TimeUtil.timeStringAs(getOptional("kylin.multi-tenant.route-task-timeout", "30min"), TimeUnit.MILLISECONDS); } + + public String getKubernetesUploadPath() { + return getOptional(getKubernetesUploadPathKey()); + } + + public String getKubernetesUploadPathKey() { + return "kylin.engine.spark-conf.spark.kubernetes.file.upload.path"; + } } diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index bc0ba7b6d3..bae641e0b0 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -71,8 +71,11 @@ import org.apache.kylin.engine.spark.job.RestfulJobProgressReport; import org.apache.kylin.engine.spark.job.SparkJobConstants; import org.apache.kylin.engine.spark.job.UdfManager; import org.apache.kylin.engine.spark.scheduler.ClusterMonitor; +import org.apache.kylin.engine.spark.utils.HDFSUtils; import org.apache.kylin.engine.spark.utils.JobMetricsUtils; import org.apache.kylin.engine.spark.utils.SparkConfHelper; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.model.NDataModel; @@ -417,6 +420,14 @@ public abstract class SparkApplication implements Application { } public void extraDestroy() { + if (config != null && StringUtils.isNotEmpty(config.getKubernetesUploadPath())) { + logger.info("uploadPath={}", config.getKubernetesUploadPath()); + try { + HDFSUtils.deleteMarkFile(config.getKubernetesUploadPath()); + } catch (Exception e) { + logger.warn("Failed to delete " + config.getKubernetesUploadPath(), e); + } + } if (clusterMonitor != null) { clusterMonitor.shutdown(); } diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 9d30029819..f73da6d1bc 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -55,6 +55,11 @@ import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.StringHelper; import org.apache.kylin.engine.spark.merger.MetadataMerger; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.JobStoppedException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -70,18 +75,12 @@ import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.NProjectManager; -import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin; import org.apache.kylin.metadata.view.LogicalView; import org.apache.kylin.metadata.view.LogicalViewManager; +import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import lombok.val; /** @@ -372,6 +371,11 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage jobOverrides.put("kylin.engine.spark-conf." + SPARK_YARN_QUEUE, yarnQueue); } } + String path = kylinConfigExt.getKubernetesUploadPath(); + if (StringUtils.isNotEmpty(path)) { + jobOverrides.put(kylinConfigExt.getKubernetesUploadPathKey(), + path + "/" + StringUtils.defaultIfBlank(parentId, getId())); + } return KylinConfigExt.createInstance(kylinConfigExt, jobOverrides); } @@ -481,10 +485,8 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage String dataflowId = getDataflowId(); LogicalViewManager viewManager = LogicalViewManager.getInstance(config); if (StringUtils.isNotBlank(dataflowId)) { - Set<String> viewsMeta = viewManager - .findLogicalViewsInModel(getProject(), getDataflowId()) - .stream().map(LogicalView::getResourcePath) - .collect(Collectors.toSet()); + Set<String> viewsMeta = viewManager.findLogicalViewsInModel(getProject(), getDataflowId()).stream() + .map(LogicalView::getResourcePath).collect(Collectors.toSet()); dumpList.addAll(viewsMeta); } if (StringUtils.isNotBlank(table)) { diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java index 6ac8ae088e..cc44aedd0d 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java @@ -27,10 +27,14 @@ import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; import org.apache.kylin.engine.spark.job.KylinBuildEnv; +import org.apache.kylin.engine.spark.job.ParamsConstants; import org.apache.kylin.engine.spark.job.RestfulJobProgressReport; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NTableMetadataManager; @@ -45,11 +49,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; - -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - -import org.apache.kylin.engine.spark.job.ParamsConstants; +import org.springframework.test.util.ReflectionTestUtils; public class SparkApplicationTest extends NLocalWithSparkSessionTest { @@ -131,8 +131,7 @@ public class SparkApplicationTest extends NLocalWithSparkSessionTest { Mockito.reset(report); Mockito.doReturn("http://sandbox.hortonworks.com:8088/proxy/application_1561370224051_0160/").when(application) .getTrackingUrl(null, ss); - Mockito.doReturn(Boolean.FALSE).when(report).updateSparkJobInfo(params, - "/kylin/api/jobs/spark", payloadJson); + Mockito.doReturn(Boolean.FALSE).when(report).updateSparkJobInfo(params, "/kylin/api/jobs/spark", payloadJson); Assert.assertFalse(report.updateSparkJobExtraInfo(params, "/kylin/api/jobs/spark", "test_job_output", "cb91189b-2b12-4527-aa35-0130e7d54ec0", extraInfo)); @@ -187,4 +186,22 @@ public class SparkApplicationTest extends NLocalWithSparkSessionTest { Assert.assertTrue(sparkApplication.checkRangePartitionTableIsExist(nDataModel2)); } + @Test + public void testExtraDestroy() throws IOException { + KylinConfig config = getTestConfig(); + String path = tempDir.getPath() + "/upload"; + SparkApplication application = new SparkApplication() { + @Override + protected void doExecute() { + } + }; + File upload = new File(path); + FileUtils.forceMkdir(upload); + Assert.assertTrue(upload.exists()); + config.setProperty(config.getKubernetesUploadPathKey(), path); + ReflectionTestUtils.setField(application, "config", config); + application.extraDestroy(); + Assert.assertFalse(upload.exists()); + } + } diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java index a7a221a530..b86eb333f3 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java @@ -50,6 +50,7 @@ import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; import org.apache.kylin.engine.spark.builder.SnapshotBuilder; import org.apache.kylin.engine.spark.merger.AfterBuildResourceMerger; import org.apache.kylin.engine.spark.storage.ParquetStorage; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.dao.JobStatistics; import org.apache.kylin.job.dao.JobStatisticsManager; import org.apache.kylin.job.engine.JobEngineConfig; @@ -97,8 +98,6 @@ import org.junit.Test; import org.mockito.Mockito; import org.sparkproject.guava.collect.Sets; -import org.apache.kylin.guava30.shaded.common.collect.Maps; - import lombok.val; import scala.Option; import scala.runtime.AbstractFunction1; @@ -624,9 +623,11 @@ public class NSparkCubingJobTest extends NLocalWithSparkSessionTest { NProjectManager.getInstance(getTestConfig()).updateProject(project, copyForWrite -> { LinkedHashMap<String, String> overrideKylinProps = copyForWrite.getOverrideKylinProps(); overrideKylinProps.put("kylin.engine.spark-conf.spark.locality.wait", "10"); + overrideKylinProps.put("kylin.engine.spark-conf.spark.kubernetes.file.upload.path", "/tmp"); }); // get SparkConfigOverride from project overrideProps KylinConfig config = executable.getConfig(); + Assert.assertEquals("/tmp/" + executable.getId(), config.getKubernetesUploadPath()); Assert.assertEquals(getTestConfig(), config.base()); Assert.assertNull(getTestConfig().getSparkConfigOverride().get("spark.locality.wait")); Assert.assertEquals("10", config.getSparkConfigOverride().get("spark.locality.wait"));