This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 80a87f9d531d9325e5fdf1a0af839ba9f601dcef
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Fri Feb 24 13:55:21 2023 +0800

    [DIRTY] Fix a build job is submitted in yarn cluster mode, it cannot be 
written into the driver log
    
    Co-authored-by: sibing.zhang <sibing.zh...@qq.com>
---
 .../java/org/apache/kylin/common/KylinConfigBase.java  |  4 ++++
 .../org/apache/kylin/common/KylinConfigBaseTest.java   |  9 +++++++++
 .../apache/kylin/job/execution/NExecutableManager.java |  5 +++++
 .../org/apache/kylin/rest/service/JobServiceTest.java  | 18 ++++++++++++++++++
 4 files changed, 36 insertions(+)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 853c5d6ce6..7f90613fad 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3780,6 +3780,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE));
     }
 
+    public boolean isJobTmpDirALLPermissionEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.engine.job-tmp-dir-all-permission-enabled",
 FALSE));
+    }
+
     public boolean isProjectMergeWithBloatEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.query.project-merge-with-bloat-enabled",
 "true"));
     }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 23f39ccd1f..5dfad30ca5 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1397,6 +1397,15 @@ class KylinConfigBaseTest {
         assertTrue(config.isBuildSegmentOverlapEnabled());
     }
 
+    @Test
+    void testIsJobTmpDirReadWritePermissionEnabled() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.engine.job-tmp-dir-all-permission-enabled", 
"false");
+        assertFalse(config.isJobTmpDirALLPermissionEnabled());
+        config.setProperty("kylin.engine.job-tmp-dir-all-permission-enabled", 
"true");
+        assertTrue(config.isJobTmpDirALLPermissionEnabled());
+    }
+
     @Test
     void testIsQuotaStorageEnabled() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 4f1b192f1d..31d1b9ead7 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -1535,6 +1537,9 @@ public class NExecutableManager {
             Path path = new Path(resPath);
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
             dout = fs.create(path, true);
+            if 
(KylinConfig.getInstanceFromEnv().isJobTmpDirALLPermissionEnabled()) {
+                fs.setPermission(path.getParent(), new 
FsPermission(FsAction.ALL, FsAction.READ, FsAction.ALL));
+            }
             JsonUtil.writeValue(dout, obj);
         } catch (Exception e) {
             // the operation to update output to hdfs failed, next task should 
not be interrupted.
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index 1bb130d731..27245e7046 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -61,7 +61,9 @@ import javax.servlet.http.HttpServletRequest;
 import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.persistence.metadata.Epoch;
@@ -1903,6 +1905,22 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(result, jobService.getStepOutput("default", jobId, 
jobId));
     }
 
+    @Test
+    public void testJobSubdirectoryPermission() throws IOException {
+        String jobId = "e1ad7bb0-522e-456a-859d-2eab1df448de";
+        NExecutableManager manager = 
NExecutableManager.getInstance(jobService.getConfig(), "default");
+        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+        Map<String, String> info = Maps.newHashMap();
+        info.put("nodes", "localhost:7070:all");
+        executableOutputPO.setInfo(info);
+        overwriteSystemProp("kylin.engine.job-tmp-dir-all-permission-enabled", 
"true");
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        String file = 
KylinConfig.getInstanceFromEnv().getJobTmpOutputStorePath("default", jobId);
+        Path path = new Path(file);
+        manager.updateJobOutputToHDFS(file, executableOutputPO);
+        Assert.assertSame(FsAction.ALL, 
fs.getFileStatus(path.getParent()).getPermission().getOtherAction());
+    }
+
     @Test
     public void testExecutableResponse() throws Exception {
         val modelManager = mock(NDataModelManager.class);

Reply via email to