This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 80a87f9d531d9325e5fdf1a0af839ba9f601dcef Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Fri Feb 24 13:55:21 2023 +0800 [DIRTY] Fix a build job is submitted in yarn cluster mode, it cannot be written into the driver log Co-authored-by: sibing.zhang <sibing.zh...@qq.com> --- .../java/org/apache/kylin/common/KylinConfigBase.java | 4 ++++ .../org/apache/kylin/common/KylinConfigBaseTest.java | 9 +++++++++ .../apache/kylin/job/execution/NExecutableManager.java | 5 +++++ .../org/apache/kylin/rest/service/JobServiceTest.java | 18 ++++++++++++++++++ 4 files changed, 36 insertions(+) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 853c5d6ce6..7f90613fad 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3780,6 +3780,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE)); } + public boolean isJobTmpDirALLPermissionEnabled() { + return Boolean.parseBoolean(getOptional("kylin.engine.job-tmp-dir-all-permission-enabled", FALSE)); + } + public boolean isProjectMergeWithBloatEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.project-merge-with-bloat-enabled", "true")); } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index 23f39ccd1f..5dfad30ca5 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -1397,6 +1397,15 @@ class KylinConfigBaseTest { assertTrue(config.isBuildSegmentOverlapEnabled()); } + @Test + void testIsJobTmpDirReadWritePermissionEnabled() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty("kylin.engine.job-tmp-dir-all-permission-enabled", "false"); + assertFalse(config.isJobTmpDirALLPermissionEnabled()); + config.setProperty("kylin.engine.job-tmp-dir-all-permission-enabled", "true"); + assertTrue(config.isJobTmpDirALLPermissionEnabled()); + } + @Test void testIsQuotaStorageEnabled() { KylinConfig config = KylinConfig.getInstanceFromEnv(); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java index 4f1b192f1d..31d1b9ead7 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java @@ -62,6 +62,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; @@ -1535,6 +1537,9 @@ public class NExecutableManager { Path path = new Path(resPath); FileSystem fs = HadoopUtil.getWorkingFileSystem(); dout = fs.create(path, true); + if (KylinConfig.getInstanceFromEnv().isJobTmpDirALLPermissionEnabled()) { + fs.setPermission(path.getParent(), new FsPermission(FsAction.ALL, FsAction.READ, FsAction.ALL)); + } JsonUtil.writeValue(dout, obj); } catch (Exception e) { // the operation to update output to hdfs failed, next task should not be interrupted. diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index 1bb130d731..27245e7046 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -61,7 +61,9 @@ import javax.servlet.http.HttpServletRequest; import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.metadata.Epoch; @@ -1903,6 +1905,22 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Assert.assertEquals(result, jobService.getStepOutput("default", jobId, jobId)); } + @Test + public void testJobSubdirectoryPermission() throws IOException { + String jobId = "e1ad7bb0-522e-456a-859d-2eab1df448de"; + NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), "default"); + ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); + Map<String, String> info = Maps.newHashMap(); + info.put("nodes", "localhost:7070:all"); + executableOutputPO.setInfo(info); + overwriteSystemProp("kylin.engine.job-tmp-dir-all-permission-enabled", "true"); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + String file = KylinConfig.getInstanceFromEnv().getJobTmpOutputStorePath("default", jobId); + Path path = new Path(file); + manager.updateJobOutputToHDFS(file, executableOutputPO); + Assert.assertSame(FsAction.ALL, fs.getFileStatus(path.getParent()).getPermission().getOtherAction()); + } + @Test public void testExecutableResponse() throws Exception { val modelManager = mock(NDataModelManager.class);